Release rocketmq-jms 1.0.0 version
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..485dee6
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+.idea
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..07a5fa6
--- /dev/null
+++ b/README.md
@@ -0,0 +1,32 @@
+# RocketMQ Externals
+
+There are some RocketMQ external projects, with the purpose of growing the RocketMQ community.
+
+## RocketMQ-Console-Ng
+A console for RocketMQ
+
+## RocketMQ-JMS
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker. Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.
+
+## RocketMQ-Flume-Ng
+
+This project is used to receive and send messages between
+[RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume)
+
+1. Firstly, please get familiar with [RocketMQ](http://rocketmq.incubator.apache.org/) and [Flume-ng](https://github.com/apache/flume).
+2. Ensure that the jar related to [RocketMQ](http://rocketmq.incubator.apache.org/dowloading/releases) exists in local maven repository.
+3. Execute the following command in rocketmq-flume root directory
+
+   `mvn clean install dependency:copy-dependencies`
+
+4. Copy the jar depended by rocketmq-flume to `$FLUME_HOME/lib`(the specific jar will be given later)
+
+
+## RocketMQ-Spark
+
+Apache Spark-Streaming integration with RocketMQ. Both push & pull consumer mode are provided.
+For more details please refer to rocketmq-spark README.md.
+
+## RocketMQ-Docker
+Apache RocketMQ Docker provides Dockerfile and bash scripts for building and running docker image.
+
diff --git a/rocketmq-jms/.gitignore b/rocketmq-jms/.gitignore
new file mode 100644
index 0000000..d2e5aaf
--- /dev/null
+++ b/rocketmq-jms/.gitignore
@@ -0,0 +1,5 @@
+.idea/
+*.iml
+*.ipr
+*.iws
+target/
diff --git a/rocketmq-jms/.travis.yml b/rocketmq-jms/.travis.yml
new file mode 100644
index 0000000..9f430b2
--- /dev/null
+++ b/rocketmq-jms/.travis.yml
@@ -0,0 +1,43 @@
+notifications:
+  email:
+    recipients:
+      - zhangke.huangshan@gmail.com
+      - zhendongliu92@gmail.com
+  on_success: change
+  on_failure: always
+
+language: java
+
+matrix:
+  include:
+  # On OSX, run with default JDK only.
+  # - os: osx
+  # On Linux, run with specific JDKs only.
+  # - os: linux
+  #  env: CUSTOM_JDK="oraclejdk8"
+  - os: linux
+    env: CUSTOM_JDK="oraclejdk7"
+  #- os: linux
+  #  env: CUSTOM_JDK="openjdk7"
+
+before_install:
+  - echo 'MAVEN_OPTS="$MAVEN_OPTS -Xmx1024m -XX:MaxPermSize=512m -XX:+BytecodeVerificationLocal"' >> ~/.mavenrc
+  - cat ~/.mavenrc
+  - if [ "$TRAVIS_OS_NAME" == "osx" ]; then export JAVA_HOME=$(/usr/libexec/java_home); fi
+  - if [ "$TRAVIS_OS_NAME" == "linux" ]; then jdk_switcher use "$CUSTOM_JDK"; fi
+
+#os:
+#  - linux
+#  - osx
+#jdk:
+#  - oraclejdk8
+#  - oraclejdk7
+#  - openjdk7
+
+
+script:
+  - travis_retry mvn -B clean install jacoco:report coveralls:report
+
+#after_success:
+#  - mvn clean install
+#  - mvn sonar:sonar
diff --git a/rocketmq-jms/README.md b/rocketmq-jms/README.md
new file mode 100644
index 0000000..a05e27e
--- /dev/null
+++ b/rocketmq-jms/README.md
@@ -0,0 +1,31 @@
+# RocketMQ-JMS   [![Build Status](https://travis-ci.org/rocketmq/rocketmq-jms.svg?branch=master)](https://travis-ci.org/rocketmq/rocketmq-jms) [![Coverage Status](https://coveralls.io/repos/github/rocketmq/rocketmq-jms/badge.svg?branch=master)](https://coveralls.io/github/rocketmq/rocketmq-jms?branch=master)
+
+
+## Introduction
+RocketMQ-JMS is an implement of JMS specification,taking Apache RocketMQ as broker.
+Now we are on the way of supporting JMS 1.1 and JMS2.0 is our final target.   
+
+Now RocketMQ-JMS will release the first version soon, and new features will be developed on the branch "v1.1".
+Please visit the [issue board](https://github.com/rocketmq/rocketmq-jms/issues) to see features in next version. 
+
+
+## Building
+
+  > cd rocketmq-jms  
+  > mvn clean install  
+  
+  **run unit test:**  
+  > mvn test    
+  
+  **run integration test:**  
+  > mvn verify
+  
+  **see jacoco code coverage report**
+  > open core/target/site/jacoco/index.html  
+  > open core/target/site/jacoco-it/index.html  
+  > open spring/target/site/jacoco-it/index.html 
+  
+  
+## Guidelines
+
+ Please see [Coding Guidelines Introduction](http://rocketmq.apache.org/docs/code-guidelines/)
diff --git a/rocketmq-jms/core/pom.xml b/rocketmq-jms/core/pom.xml
new file mode 100644
index 0000000..c1ed1d0
--- /dev/null
+++ b/rocketmq-jms/core/pom.xml
@@ -0,0 +1,48 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-jms-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>1.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-jms</artifactId>
+    <version>1.0.0</version>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
new file mode 100644
index 0000000..80a8b64
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonConstant.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface CommonConstant {
+
+    String PRODUCERID = "producerId";
+
+    String CONSUMERID = "consumerId";
+
+    String PROVIDER = "provider";
+
+    String NAMESERVER = "nameServer";
+
+    String INSTANCE_NAME = "instanceName";
+
+    String CONSUME_THREAD_NUMS = "consumeThreadNums";
+
+    String SEND_TIMEOUT_MILLIS = "sendMsgTimeoutMillis";
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
new file mode 100644
index 0000000..c8e4276
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/CommonContext.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import org.apache.commons.lang.builder.ReflectionToStringBuilder;
+import org.apache.commons.lang.builder.ToStringStyle;
+
+public class CommonContext {
+    private String accessKey;
+    private String secretKey;
+
+    private String consumerId;
+    private String producerId;
+
+    private String provider;
+
+    private String appId;
+
+    private String nameServer;
+
+    /**
+     * MQType
+     */
+    private String mqType;
+
+    /**
+     * Using for distinguishing client jvm process
+     */
+    private String instanceName;
+    /**
+     * Set consumer threadPool Size
+     */
+    private int consumeThreadNums;
+    /**
+     * Set send message timeOut
+     */
+    private int sendMsgTimeoutMillis = -1;
+
+    /**
+     * @return the appId
+     */
+    public String getAppId() {
+        return appId;
+    }
+
+    /**
+     * @param appId the appId to set
+     */
+    public void setAppId(String appId) {
+        this.appId = appId;
+    }
+
+    /**
+     * @return the provider
+     */
+    public String getProvider() {
+        return provider;
+    }
+
+    /**
+     * @param provider the provider to set
+     */
+    public void setProvider(String provider) {
+        this.provider = provider;
+    }
+
+    /**
+     * @return the instanceName
+     */
+    public String getInstanceName() {
+        return instanceName;
+    }
+
+    /**
+     * @param instanceName the instanceName to set
+     */
+    public void setInstanceName(String instanceName) {
+        this.instanceName = instanceName;
+    }
+
+    /**
+     * @return the accessKey
+     */
+    public String getAccessKey() {
+        return accessKey;
+    }
+
+    /**
+     * @param accessKey the accessKey to set
+     */
+    public void setAccessKey(String accessKey) {
+        this.accessKey = accessKey;
+    }
+
+    /**
+     * @return the secretKey
+     */
+    public String getSecretKey() {
+        return secretKey;
+    }
+
+    /**
+     * @param secretKey the secretKey to set
+     */
+    public void setSecretKey(String secretKey) {
+        this.secretKey = secretKey;
+    }
+
+    /**
+     * @return consumer thread nums
+     */
+    public int getConsumeThreadNums() {
+        return consumeThreadNums;
+    }
+
+    /**
+     * @param consumeThreadNums
+     */
+    public void setConsumeThreadNums(int consumeThreadNums) {
+        this.consumeThreadNums = consumeThreadNums;
+    }
+
+    public String getConsumerId() {
+        return consumerId;
+    }
+
+    public void setConsumerId(String consumerId) {
+        this.consumerId = consumerId;
+    }
+
+    public String getProducerId() {
+        return producerId;
+    }
+
+    public void setProducerId(String producerId) {
+        this.producerId = producerId;
+    }
+
+    public int getSendMsgTimeoutMillis() {
+        return sendMsgTimeoutMillis;
+    }
+
+    public void setSendMsgTimeoutMillis(int sendMsgTimeoutMillis) {
+        this.sendMsgTimeoutMillis = sendMsgTimeoutMillis;
+    }
+
+    public String getMqType() {
+        return mqType;
+    }
+
+    public void setMqType(String mqType) {
+        this.mqType = mqType;
+    }
+
+    public String getNameServer() {
+        return nameServer;
+    }
+
+    public void setNameServer(String nameServer) {
+        this.nameServer = nameServer;
+    }
+
+    @Override
+    public String toString() {
+        return ReflectionToStringBuilder.toString(this, ToStringStyle.DEFAULT_STYLE);
+    }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
new file mode 100644
index 0000000..4c809c7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnection.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.commons.lang.StringUtils;
+
+public class JmsBaseConnection implements Connection {
+    private final AtomicBoolean started = new AtomicBoolean(false);
+    protected String clientID;
+    protected ExceptionListener exceptionListener;
+    protected CommonContext context;
+    protected JmsBaseSession session;
+
+    public JmsBaseConnection(Map<String, String> connectionParams) {
+
+        this.clientID = UUID.randomUUID().toString();
+
+        context = new CommonContext();
+
+        //At lease one should be set
+        context.setProducerId(connectionParams.get(CommonConstant.PRODUCERID));
+        context.setConsumerId(connectionParams.get(CommonConstant.CONSUMERID));
+
+        //optional
+        context.setProvider(connectionParams.get(CommonConstant.PROVIDER));
+
+        String nameServer = connectionParams.get(CommonConstant.NAMESERVER);
+        String consumerThreadNums = connectionParams.get(CommonConstant.CONSUME_THREAD_NUMS);
+        String sendMsgTimeoutMillis = connectionParams.get(CommonConstant.SEND_TIMEOUT_MILLIS);
+        String instanceName = connectionParams.get(CommonConstant.INSTANCE_NAME);
+
+        if (StringUtils.isNotEmpty(nameServer)) {
+            context.setNameServer(nameServer);
+        }
+        if (StringUtils.isNotEmpty(instanceName)) {
+            context.setInstanceName(connectionParams.get(CommonConstant.INSTANCE_NAME));
+        }
+
+        if (StringUtils.isNotEmpty(consumerThreadNums)) {
+            context.setConsumeThreadNums(Integer.parseInt(consumerThreadNums));
+        }
+        if (StringUtils.isNotEmpty(sendMsgTimeoutMillis)) {
+            context.setSendMsgTimeoutMillis(Integer.parseInt(sendMsgTimeoutMillis));
+        }
+    }
+
+    @Override
+    public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+
+        Preconditions.checkArgument(!transacted, "Not support transaction Session !");
+        Preconditions.checkArgument(Session.AUTO_ACKNOWLEDGE == acknowledgeMode,
+            "Not support this acknowledge mode: " + acknowledgeMode);
+
+        if (null != this.session) {
+            return this.session;
+        }
+        synchronized (this) {
+            if (null != this.session) {
+                return this.session;
+            }
+            this.session = new JmsBaseSession(this, transacted, acknowledgeMode, context);
+            if (isStarted()) {
+                this.session.start();
+            }
+            return this.session;
+        }
+    }
+
+    @Override
+    public String getClientID() throws JMSException {
+        return this.clientID;
+    }
+
+    @Override
+    public void setClientID(String clientID) throws JMSException {
+        this.clientID = clientID;
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() throws JMSException {
+        return new JmsBaseConnectionMetaData();
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() throws JMSException {
+        return this.exceptionListener;
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener listener) throws JMSException {
+        this.exceptionListener = listener;
+    }
+
+    @Override
+    public void start() throws JMSException {
+        if (started.compareAndSet(false, true)) {
+            if (this.session != null) {
+                this.session.start();
+            }
+
+        }
+    }
+
+    @Override
+    public void stop() throws JMSException {
+        //Stop the connection before closing it.
+        //Do nothing here.
+    }
+
+    @Override
+    public void close() throws JMSException {
+        if (started.compareAndSet(true, false)) {
+            if (this.session != null) {
+                this.session.close();
+            }
+
+        }
+    }
+
+    @Override
+    public ConnectionConsumer createConnectionConsumer(Destination destination,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+        String messageSelector,
+        ServerSessionPool sessionPool,
+        int maxMessages) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Whether the connection is started.
+     *
+     * @return whether the connection is started.
+     */
+    public boolean isStarted() {
+        return started.get();
+    }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
new file mode 100644
index 0000000..1b9da06
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionFactory.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.net.URI;
+import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import org.apache.rocketmq.jms.util.URISpecParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseConnectionFactory implements ConnectionFactory {
+
+    private static Logger logger = LoggerFactory
+        .getLogger(JmsBaseConnectionFactory.class);
+    /**
+     * Synchronization monitor for the shared Connection
+     */
+    private final Object connectionMonitor = new Object();
+    /**
+     * Can be configured in a consistent way without too much URL hacking.
+     */
+    protected URI connectionUri;
+    /**
+     * Store connection uri query parameters.
+     */
+    protected Map<String, String> connectionParams;
+    /**
+     * Wrapped Connection
+     */
+    protected JmsBaseConnection connection;
+
+    public JmsBaseConnectionFactory() {
+
+    }
+
+    public JmsBaseConnectionFactory(URI connectionUri) {
+        setConnectionUri(connectionUri);
+    }
+
+    public void setConnectionUri(URI connectionUri) {
+        Preconditions.checkNotNull(connectionUri, "Please set URI !");
+        this.connectionUri = connectionUri;
+        this.connectionParams = URISpecParser.parseURI(connectionUri.toString());
+
+        if (null != connectionParams) {
+            Preconditions.checkState(null != connectionParams.get(CommonConstant.CONSUMERID) ||
+                null != connectionParams.get(CommonConstant.PRODUCERID), "Please set consumerId or ProducerId !");
+        }
+
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException {
+        synchronized (this.connectionMonitor) {
+            if (this.connection == null) {
+                initConnection();
+            }
+            return this.connection;
+        }
+    }
+
+    /**
+     * Using userName and Password to create a connection
+     *
+     * @param userName ignored
+     * @param password ignored
+     * @return the new JMS Connection
+     * @throws JMSException
+     */
+    @Override
+    public Connection createConnection(String userName, String password) throws JMSException {
+        logger.debug("Using userName and Password to create a connection.");
+        return this.createConnection();
+    }
+
+    /**
+     * Initialize the underlying shared Connection.
+     * <p/>
+     * Closes and reInitializes the Connection if an underlying Connection is present already.
+     *
+     * @throws javax.jms.JMSException if thrown by JMS API methods
+     */
+    protected void initConnection() throws JMSException {
+        synchronized (this.connectionMonitor) {
+            if (this.connection != null) {
+                closeConnection(this.connection);
+            }
+            this.connection = doCreateConnection();
+            logger.debug("Established shared JMS Connection: {}", this.connection);
+        }
+    }
+
+    /**
+     * Close the given Connection.
+     *
+     * @param con the Connection to close
+     */
+    protected void closeConnection(Connection con) {
+        logger.debug("Closing shared JMS Connection: {}", this.connection);
+        try {
+            try {
+                con.stop();
+            }
+            finally {
+                con.close();
+            }
+        }
+        catch (Throwable ex) {
+            logger.error("Could not close shared JMS Connection.", ex);
+        }
+    }
+
+    /**
+     * Create a JMS Connection
+     *
+     * @return the new JMS Connection
+     * @throws javax.jms.JMSException if thrown by JMS API methods
+     */
+    protected JmsBaseConnection doCreateConnection() throws JMSException {
+        Preconditions.checkState(null != this.connectionParams && this.connectionParams.size() > 0,
+            "Connection Parameters can not be null!");
+        this.connection = new JmsBaseConnection(this.connectionParams);
+
+        return connection;
+    }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
new file mode 100644
index 0000000..ee549aa
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConnectionMetaData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Enumeration;
+import java.util.Properties;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+
+public class JmsBaseConnectionMetaData implements ConnectionMetaData {
+    public static final String JMS_VERSION;
+    public static final int JMS_MAJOR_VERSION;
+    public static final int JMS_MINOR_VERSION;
+
+    public static final String PROVIDER_VERSION;
+    public static final int PROVIDER_MAJOR_VERSION;
+    public static final int PROVIDER_MINOR_VERSION;
+
+    public static final String PROVIDER_NAME = "Apache RocketMQ";
+
+    public static final JmsBaseConnectionMetaData INSTANCE = new JmsBaseConnectionMetaData();
+
+    public static InputStream resourceStream;
+
+    static {
+        Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+
+        String jmsVersion = null;
+        int jmsMajor = 0;
+        int jmsMinor = 0;
+        try {
+            Package p = Package.getPackage("javax.jms");
+            if (p != null) {
+                jmsVersion = p.getImplementationVersion();
+                Matcher m = pattern.matcher(jmsVersion);
+                if (m.matches()) {
+                    jmsMajor = Integer.parseInt(m.group(1));
+                    jmsMinor = Integer.parseInt(m.group(2));
+                }
+            }
+        }
+        catch (Throwable e) {
+        }
+        JMS_VERSION = jmsVersion;
+        JMS_MAJOR_VERSION = jmsMajor;
+        JMS_MINOR_VERSION = jmsMinor;
+
+        String providerVersion = null;
+        int providerMajor = 0;
+        int providerMinor = 0;
+        Properties properties = new Properties();
+        try {
+            resourceStream = JmsBaseConnectionMetaData.class.getResourceAsStream("/application.conf");
+            properties.load(resourceStream);
+            providerVersion = properties.getProperty("version");
+
+            Matcher m = pattern.matcher(providerVersion);
+            if (m.matches()) {
+                providerMajor = Integer.parseInt(m.group(1));
+                providerMinor = Integer.parseInt(m.group(2));
+            }
+        }
+        catch (IOException e) {
+            e.printStackTrace();
+        }
+        PROVIDER_VERSION = providerVersion;
+        PROVIDER_MAJOR_VERSION = providerMajor;
+        PROVIDER_MINOR_VERSION = providerMinor;
+
+    }
+
+    public String getJMSVersion() throws JMSException {
+        return JMS_VERSION;
+    }
+
+    public int getJMSMajorVersion() throws JMSException {
+        return JMS_MAJOR_VERSION;
+    }
+
+    public int getJMSMinorVersion() throws JMSException {
+        return JMS_MINOR_VERSION;
+    }
+
+    public String getJMSProviderName() throws JMSException {
+        return PROVIDER_NAME;
+    }
+
+    public String getProviderVersion() throws JMSException {
+        return PROVIDER_VERSION;
+    }
+
+    public int getProviderMajorVersion() throws JMSException {
+        return PROVIDER_MAJOR_VERSION;
+    }
+
+    public int getProviderMinorVersion() throws JMSException {
+        return PROVIDER_MINOR_VERSION;
+    }
+
+    public Enumeration<?> getJMSXPropertyNames() throws JMSException {
+        Vector<String> jmxProperties = new Vector<String>();
+        jmxProperties.add("jmsXUserId");
+        jmxProperties.add("jmsXAppId");
+        jmxProperties.add("jmsXGroupID");
+        jmxProperties.add("jmsXGroupSeq");
+        jmxProperties.add("jmsXState");
+        jmxProperties.add("jmsXDeliveryCount");
+        jmxProperties.add("jmsXProducerTXID");
+        jmxProperties.add("jmsConsumerTXID");
+        jmxProperties.add("jmsRecvTimeStamp");
+        return jmxProperties.elements();
+    }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
new file mode 100644
index 0000000..f0bca28
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseConstant.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+public interface JmsBaseConstant {
+    //------------------------JMS message header constant---------------------------------
+    String JMS_DESTINATION = "jmsDestination";
+    String JMS_DELIVERY_MODE = "jmsDeliveryMode";
+    String JMS_EXPIRATION = "jmsExpiration";
+    String JMS_DELIVERY_TIME = "jmsDeliveryTime";
+    String JMS_PRIORITY = "jmsPriority";
+    String JMS_MESSAGE_ID = "jmsMessageID";
+    String JMS_TIMESTAMP = "jmsTimestamp";
+    String JMS_CORRELATION_ID = "jmsCorrelationID";
+    String JMS_REPLY_TO = "jmsReplyTo";
+    String JMS_TYPE = "jmsType";
+    String JMS_REDELIVERED = "jmsRedelivered";
+
+    //-------------------------JMS defined properties constant----------------------------
+    /**
+     * The identity of the user sending the Send message
+     */
+    String JMS_XUSER_ID = "jmsXUserID";
+    /**
+     * The identity of the application Send sending the message
+     */
+    String JMS_XAPP_ID = "jmsXAppID";
+    /**
+     * The number of message delivery Receive attempts
+     */
+    String JMS_XDELIVERY_COUNT = "jmsXDeliveryCount";
+    /**
+     * The identity of the message group this message is part of
+     */
+    String JMS_XGROUP_ID = "jmsXGroupID";
+    /**
+     * The sequence number of this message within the group; the first message is 1, the second 2,...
+     */
+    String JMS_XGROUP_SEQ = "jmsXGroupSeq";
+    /**
+     * The transaction identifier of the Send transaction within which this message was produced
+     */
+    String JMS_XPRODUCER_TXID = "jmsXProducerTXID";
+    /**
+     * The transaction identifier of the Receive transaction within which this message was consumed
+     */
+    String JMS_XCONSUMER_TXID = "jmsXConsumerTXID";
+
+    /**
+     * The time JMS delivered the Receive message to the consumer
+     */
+    String JMS_XRCV_TIMESTAMP = "jmsXRcvTimestamp";
+    /**
+     * Assume there exists a message warehouse that contains a separate copy of each message sent to each consumer and
+     * that these copies exist from the time the original message was sent. Each copy’s state is one of: 1(waiting),
+     * 2(ready), 3(expired) or 4(retained) Since state is of no interest to producers and consumers it is not provided
+     * to either. It is only of relevance to messages looked up in a warehouse and JMS provides no API for this.
+     */
+    String JMS_XSTATE = "jmsXState";
+
+    //---------------------------JMS Headers' value constant---------------------------
+    /**
+     * Default time to live
+     */
+    long DEFAULT_TIME_TO_LIVE = 3 * 24 * 60 * 60 * 1000;
+
+    /**
+     * Default Jms Type
+     */
+    String DEFAULT_JMS_TYPE = "rocketmq";
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
new file mode 100644
index 0000000..b62e928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageConsumer.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessageConsumer implements MessageConsumer {
+
+    private static final Object LOCK_OBJECT = new Object();
+    //all shared consumers
+    private static ConcurrentMap<String/**consumerId*/, RMQPushConsumerExt> consumerMap = new MapMaker().makeMap();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private CommonContext context;
+    private Destination destination;
+    private MessageListener messageListener;
+
+    public JmsBaseMessageConsumer(Destination destination, CommonContext commonContext,
+        JmsBaseConnection connection) throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            checkArgs(destination, commonContext);
+
+            if (null == consumerMap.get(context.getConsumerId())) {
+                DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(context.getConsumerId());
+                if (context.getConsumeThreadNums() > 0) {
+                    consumer.setConsumeThreadMax(context.getConsumeThreadNums());
+                    consumer.setConsumeThreadMin(context.getConsumeThreadNums());
+                }
+                if (!Strings.isNullOrEmpty(context.getNameServer())) {
+                    consumer.setNamesrvAddr(context.getNameServer());
+                }
+                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+                    consumer.setInstanceName(context.getInstanceName());
+                }
+                consumer.setConsumeMessageBatchMaxSize(1);
+                //add subscribe?
+                RMQPushConsumerExt rocketmqConsumerExt = new RMQPushConsumerExt(consumer);
+                consumerMap.putIfAbsent(context.getConsumerId(), rocketmqConsumerExt);
+            }
+
+            consumerMap.get(context.getConsumerId()).incrementAndGet();
+
+            //If the connection has been started, start the consumer right now.
+            //add start status?
+            RMQPushConsumerExt consumerExt = consumerMap.get(context.getConsumerId());
+            if (connection.isStarted()) {
+                try {
+                    consumerExt.start();
+                }
+                catch (MQClientException mqe) {
+                    JMSException jmsException = new JMSException("Start consumer failed " + context.getConsumerId());
+                    jmsException.initCause(mqe);
+                    throw jmsException;
+                }
+            }
+        }
+
+    }
+
+    private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+        Preconditions.checkNotNull(context.getConsumerId(), "ConsumerId can not be null!");
+        Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+        this.context = context;
+        this.destination = destination;
+    }
+
+    @Override
+    public String getMessageSelector() throws JMSException {
+        return null;
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return this.messageListener;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+        if (null != rocketmqConsumerExt) {
+            try {
+                this.messageListener = listener;
+                String messageTopic = ((JmsBaseTopic) destination).getMessageTopic();
+                String messageType = ((JmsBaseTopic) destination).getMessageType();
+                rocketmqConsumerExt.subscribe(messageTopic, messageType, listener);
+            }
+            catch (MQClientException mqe) {
+                //add what?
+                throw new JMSException(mqe.getMessage());
+            }
+
+        }
+
+    }
+
+    @Override
+    public Message receive() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public Message receive(long timeout) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public Message receiveNoWait() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public void close() throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            if (closed.compareAndSet(false, true)) {
+                RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+                if (null != rocketmqConsumerExt && 0 == rocketmqConsumerExt.decrementAndGet()) {
+                    rocketmqConsumerExt.close();
+                    consumerMap.remove(context.getConsumerId());
+                }
+            }
+        }
+    }
+
+    /**
+     * Start the consumer to get message from the Broker.
+     */
+    public void startConsumer() throws JMSException {
+        RMQPushConsumerExt rocketmqConsumerExt = consumerMap.get(context.getConsumerId());
+        if (null != rocketmqConsumerExt) {
+            try {
+                rocketmqConsumerExt.start();
+            }
+            catch (MQClientException mqe) {
+                throw ExceptionUtil.convertToJmsException(mqe, "Start consumer failed");
+            }
+        }
+    }
+
+    public Destination getDestination() throws JMSException {
+        return this.destination;
+    }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
new file mode 100644
index 0000000..8dd82f0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseMessageProducer.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import com.google.common.collect.MapMaker;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentMap;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JmsBaseMessageProducer implements MessageProducer {
+
+    private static final Object LOCK_OBJECT = new Object();
+    private static ConcurrentMap<String, MQProducer> producerMap = new MapMaker().makeMap();
+    private final Logger logger = LoggerFactory.getLogger(JmsBaseMessageProducer.class);
+    private CommonContext context;
+
+    private Destination destination;
+
+    public JmsBaseMessageProducer(Destination destination, CommonContext context) throws JMSException {
+        synchronized (LOCK_OBJECT) {
+            checkArgs(destination, context);
+
+            if (null == producerMap.get(this.context.getProducerId())) {
+                DefaultMQProducer producer = new DefaultMQProducer(context.getProducerId());
+                if (!Strings.isNullOrEmpty(context.getNameServer())) {
+                    producer.setNamesrvAddr(context.getNameServer());
+                }
+                if (!Strings.isNullOrEmpty(context.getInstanceName())) {
+                    producer.setInstanceName(context.getInstanceName());
+                }
+                if (context.getSendMsgTimeoutMillis() > 0) {
+                    producer.setSendMsgTimeout(context.getSendMsgTimeoutMillis());
+                }
+                try {
+                    producer.start();
+                }
+                catch (MQClientException mqe) {
+                    throw ExceptionUtil.convertToJmsException(mqe, String.format("Start producer failed:%s", context.getProducerId()));
+                }
+                producerMap.putIfAbsent(this.context.getProducerId(), producer);
+            }
+
+        }
+    }
+
+    private void checkArgs(Destination destination, CommonContext context) throws JMSException {
+        Preconditions.checkNotNull(context.getProducerId(), "ProducerId can not be null!");
+        Preconditions.checkNotNull(destination.toString(), "Destination can not be null!");
+        this.context = context;
+        this.destination = destination;
+    }
+
+    @Override
+    public boolean getDisableMessageID() throws JMSException {
+        return false;
+    }
+
+    @Override
+    public void setDisableMessageID(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public boolean getDisableMessageTimestamp() throws JMSException {
+        return false;
+    }
+
+    @Override
+    public void setDisableMessageTimestamp(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public int getDeliveryMode() throws JMSException {
+        return javax.jms.Message.DEFAULT_DELIVERY_MODE;
+    }
+
+    @Override
+    public void setDeliveryMode(int deliveryMode) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public int getPriority() throws JMSException {
+        return javax.jms.Message.DEFAULT_PRIORITY;
+    }
+
+    @Override
+    public void setPriority(int defaultPriority) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public long getTimeToLive() throws JMSException {
+        return JmsBaseConstant.DEFAULT_TIME_TO_LIVE;
+    }
+
+    @Override
+    public void setTimeToLive(long timeToLive) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public Destination getDestination() throws JMSException {
+        return this.destination;
+    }
+
+    @Override
+    public void close() throws JMSException {
+        //Nothing to do
+    }
+
+    @Override
+    public void send(javax.jms.Message message) throws JMSException {
+        this.send(getDestination(), message);
+    }
+
+    /**
+     * Send the message to the defined Destination success---return normally. Exception---throw out JMSException.
+     *
+     * @param destination see <CODE>Destination</CODE>
+     * @param message the message to be sent.
+     * @throws javax.jms.JMSException
+     */
+    @Override
+    public void send(Destination destination, javax.jms.Message message) throws JMSException {
+        JmsBaseMessage jmsMsg = (JmsBaseMessage) message;
+        initJMSHeaders(jmsMsg, destination);
+
+        try {
+            if (context == null) {
+                throw new IllegalStateException("Context should be inited");
+            }
+            org.apache.rocketmq.common.message.Message rocketmqMsg = MessageConverter.convert2RMQMessage(jmsMsg);
+
+            MQProducer producer = producerMap.get(context.getProducerId());
+
+            if (producer == null) {
+                throw new Exception("producer is null ");
+            }
+            SendResult sendResult = producer.send(rocketmqMsg);
+            if (sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
+                jmsMsg.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + sendResult.getMsgId());
+            } else {
+                throw new Exception("SendResult is " + (sendResult == null ? "null" : sendResult.toString()));
+            }
+        }
+        catch (Exception e) {
+            logger.error("Send rocketmq message failure !", e);
+            //if fail to send the message, throw out JMSException
+            JMSException jmsException = new JMSException("Send rocketmq message failure!");
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+    }
+
+    @Override
+    public void send(javax.jms.Message message, int deliveryMode, int priority,
+        long timeToLive) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void send(Destination destination, javax.jms.Message message, int deliveryMode,
+        int priority, long timeToLive) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Init the JmsMessage Headers.
+     * <p/>
+     * <P>JMS providers init message's headers. Do not allow user to set these by yourself.
+     *
+     * @param jmsMsg message
+     * @param destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    private void initJMSHeaders(JmsBaseMessage jmsMsg, Destination destination) throws JMSException {
+
+        //JMS_DESTINATION default:"topic:message"
+        jmsMsg.setHeader(JmsBaseConstant.JMS_DESTINATION, destination);
+        //JMS_DELIVERY_MODE default : PERSISTENT
+        jmsMsg.setHeader(JmsBaseConstant.JMS_DELIVERY_MODE, javax.jms.Message.DEFAULT_DELIVERY_MODE);
+        //JMS_TIMESTAMP default : current time
+        jmsMsg.setHeader(JmsBaseConstant.JMS_TIMESTAMP, System.currentTimeMillis());
+        //JMS_EXPIRATION default :  3 days
+        //JMS_EXPIRATION = currentTime + time_to_live
+        jmsMsg.setHeader(JmsBaseConstant.JMS_EXPIRATION, System.currentTimeMillis() + JmsBaseConstant.DEFAULT_TIME_TO_LIVE);
+        //JMS_PRIORITY default : 4
+        jmsMsg.setHeader(JmsBaseConstant.JMS_PRIORITY, javax.jms.Message.DEFAULT_PRIORITY);
+        //JMS_TYPE default : open notification service
+        jmsMsg.setHeader(JmsBaseConstant.JMS_TYPE, JmsBaseConstant.DEFAULT_JMS_TYPE);
+        //JMS_REPLY_TO,JMS_CORRELATION_ID default : null
+        //JMS_MESSAGE_ID is set by sendResult.
+        //JMS_REDELIVERED is set by broker.
+    }
+
+    /**
+     * Init the OnsMessage Headers.
+     * <p/>
+     * <P>When converting JmsMessage to OnsMessage, should read from the JmsMessage's Properties and write to the
+     * OnsMessage's Properties.
+     *
+     * @param jmsMsg message
+     * @throws javax.jms.JMSException
+     */
+    public static Properties initRocketMQHeaders(JmsBaseMessage jmsMsg,
+        String topic, String messageType) throws JMSException {
+        Properties userProperties = new Properties();
+
+        //Jms userProperties to properties
+        Map<String, Object> userProps = jmsMsg.getProperties();
+        Iterator<Map.Entry<String, Object>> userPropsIter = userProps.entrySet().iterator();
+        while (userPropsIter.hasNext()) {
+            Map.Entry<String, Object> entry = userPropsIter.next();
+            userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+        }
+        //Jms systemProperties to ROCKETMQ properties
+        Map<String, Object> sysProps = jmsMsg.getHeaders();
+        Iterator<Map.Entry<String, Object>> sysPropsIter = sysProps.entrySet().iterator();
+        while (sysPropsIter.hasNext()) {
+            Map.Entry<String, Object> entry = sysPropsIter.next();
+            userProperties.setProperty(entry.getKey(), entry.getValue().toString());
+        }
+
+        //Jms message Model
+        if (jmsMsg instanceof JmsBytesMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_BYTES);
+        }
+        else if (jmsMsg instanceof JmsObjectMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_OBJ);
+        }
+        else if (jmsMsg instanceof JmsTextMessage) {
+            userProperties.setProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+        }
+
+        //message topic and tag
+        userProperties.setProperty(MsgConvertUtil.MSG_TOPIC, topic);
+        userProperties.setProperty(MsgConvertUtil.MSG_TYPE, messageType);
+
+        return userProperties;
+    }
+
+}
+
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
new file mode 100644
index 0000000..5bf7005
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseSession.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.jms.BytesMessage;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseSession implements Session {
+    protected CommonContext context;
+    protected JmsBaseConnection connection;
+    protected CopyOnWriteArrayList<JmsBaseMessageConsumer> consumerList =
+        new CopyOnWriteArrayList<JmsBaseMessageConsumer>();
+    private boolean transacted = true;
+    private int acknowledgeMode = AUTO_ACKNOWLEDGE;
+
+    public JmsBaseSession(JmsBaseConnection connection, boolean transacted,
+        int acknowledgeMode, CommonContext context) {
+        this.context = context;
+        this.acknowledgeMode = acknowledgeMode;
+        this.transacted = transacted;
+        this.connection = connection;
+    }
+
+    @Override
+    public BytesMessage createBytesMessage() throws JMSException {
+        return new JmsBytesMessage();
+    }
+
+    @Override
+    public MapMessage createMapMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Message createMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() throws JMSException {
+        return new JmsObjectMessage();
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+        return new JmsObjectMessage(object);
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public TextMessage createTextMessage() throws JMSException {
+        return new JmsTextMessage();
+    }
+
+    @Override
+    public TextMessage createTextMessage(String text) throws JMSException {
+        return new JmsTextMessage(text);
+    }
+
+    @Override
+    public boolean getTransacted() throws JMSException {
+        return this.transacted;
+    }
+
+    @Override
+    public int getAcknowledgeMode() {
+        return this.acknowledgeMode;
+    }
+
+    @Override
+    public void commit() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void rollback() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public void close() throws JMSException {
+        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+            messageConsumer.close();
+        }
+        consumerList.clear();
+    }
+
+    @Override
+    public void recover() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public MessageListener getMessageListener() throws JMSException {
+        return null;
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public void run() {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public MessageProducer createProducer(Destination destination) throws JMSException {
+        return new JmsBaseMessageProducer(destination, context);
+    }
+
+    /**
+     * Create a MessageConsumer.
+     * <p/>
+     * <P>Create a durable consumer to the specified destination
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination) throws JMSException {
+        JmsBaseMessageConsumer messageConsumer = new
+            JmsBaseMessageConsumer(destination, this.context, this.connection);
+        this.consumerList.addIfAbsent(messageConsumer);
+        return messageConsumer;
+    }
+
+    /**
+     * Create a MessageConsumer with messageSelector.
+     * <p/>
+     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @param messageSelector For filtering messages
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector)
+        throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+
+    }
+
+    /**
+     * Create a MessageConsumer with messageSelector.
+     * <p/>
+     * <P>ROCKETMQ-JMS do not support using messageSelector to filter messages and do not support this mechanism to reject
+     * messages from localhost.
+     *
+     * @param destination Equals to Topic:MessageType in ROCKETMQ
+     * @param messageSelector For filtering messages
+     * @param noLocal If true: reject messages from localhost
+     * @throws javax.jms.JMSException
+     * @see <CODE>Destination</CODE>
+     */
+    @Override
+    public MessageConsumer createConsumer(Destination destination, String messageSelector,
+        boolean noLocal) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Queue createQueue(String queueName) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public Topic createTopic(String topicName) throws JMSException {
+        Preconditions.checkNotNull(topicName);
+        List<String> msgTuple = Arrays.asList(topicName.split(":"));
+
+        Preconditions.checkState(msgTuple.size() >= 1 && msgTuple.size() <= 2,
+            "Destination must match messageTopic:messageType !");
+
+        //If messageType is null, use * instead.
+        if (1 == msgTuple.size()) {
+            return new JmsBaseTopic(msgTuple.get(0), "*");
+        }
+        return new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1));
+    }
+
+    /**
+     * Create a MessageConsumer with durable subscription.
+     * <p/>
+     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+     *
+     * @param topic destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Topic</CODE>
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name)
+        throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    /**
+     * Create a MessageConsumer with durable subscription.
+     * <p/>
+     * <P>When using <CODE>createConsumer(Destination)</CODE> method, one creates a MessageConsumer with a durable
+     * subscription. So use <CODE>createConsumer(Destination)</CODE> instead of these method.
+     *
+     * @param topic destination
+     * @throws javax.jms.JMSException
+     * @see <CODE>Topic</CODE>
+     */
+    @Override
+    public TopicSubscriber createDurableSubscriber(Topic topic, String name,
+        String messageSelector,
+        boolean noLocal) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() throws JMSException {
+        return new TemporaryQueue() {
+            public void delete() throws JMSException {
+            }
+
+            public String getQueueName() throws JMSException {
+                return UUID.randomUUID().toString();
+            }
+        };
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() throws JMSException {
+        return new TemporaryTopic() {
+            public void delete() throws JMSException {
+            }
+
+            public String getTopicName() throws JMSException {
+                return UUID.randomUUID().toString();
+            }
+        };
+    }
+
+    @Override
+    public void unsubscribe(String name) throws JMSException {
+        throw new UnsupportedOperationException("Unsupported");
+    }
+
+    public void start() throws JMSException {
+        for (JmsBaseMessageConsumer messageConsumer : consumerList) {
+            messageConsumer.startConsumer();
+        }
+    }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
new file mode 100644
index 0000000..b7e2fab
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/JmsBaseTopic.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+
+public class JmsBaseTopic implements Topic {
+
+    private String messageTopic;
+    private String messageType;
+
+    public JmsBaseTopic(String messageTopic, String messageType) {
+        Preconditions.checkNotNull(messageTopic);
+        Preconditions.checkNotNull(messageType);
+
+        this.messageTopic = messageTopic;
+        this.messageType = messageType;
+    }
+
+    public String getTopicName() throws JMSException {
+        return this.toString();
+    }
+
+    public String toString() {
+        return Joiner.on(":").join(this.getMessageTopic(), this.getMessageType());
+    }
+
+    public String getMessageTopic() {
+        return messageTopic;
+    }
+
+    public String getMessageType() {
+        return messageType;
+    }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
new file mode 100644
index 0000000..7a8a9f7
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/RMQPushConsumerExt.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.rocketmq.client.consumer.MQPushConsumer;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.util.MessageConverter;
+
+public class RMQPushConsumerExt {
+    private final MQPushConsumer consumer;
+    private final ConcurrentHashMap<String/* Topic */, javax.jms.MessageListener> subscribeTable = new ConcurrentHashMap<String, javax.jms.MessageListener>();
+
+    private AtomicInteger referenceCount = new AtomicInteger(0);
+    private AtomicBoolean started = new AtomicBoolean(false);
+
+    public RMQPushConsumerExt(MQPushConsumer consumer) {
+        this.consumer = consumer;
+    }
+
+    public MQPushConsumer getConsumer() {
+        return consumer;
+    }
+
+    public int incrementAndGet() {
+        return referenceCount.incrementAndGet();
+    }
+
+    public int decrementAndGet() {
+        return referenceCount.decrementAndGet();
+    }
+
+    public int getReferenceCount() {
+        return referenceCount.get();
+    }
+    public void start() throws MQClientException {
+        if (consumer == null) {
+            throw new MQClientException(-1, "consumer is null");
+        }
+
+        if (this.started.compareAndSet(false, true)) {
+            this.consumer.registerMessageListener(new MessageListenerImpl());
+            this.consumer.start();
+        }
+    }
+
+
+    public void close() {
+        if (this.started.compareAndSet(true, false)) {
+            this.consumer.shutdown();
+        }
+    }
+
+    public void subscribe(String topic, String subExpression, javax.jms.MessageListener listener) throws MQClientException {
+        if (null == topic) {
+            throw new MQClientException(-1, "topic is null");
+        }
+
+        if (null == listener) {
+            throw new MQClientException(-1, "listener is null");
+        }
+
+        try {
+            this.subscribeTable.put(topic, listener);
+            this.consumer.subscribe(topic, subExpression);
+        } catch (MQClientException e) {
+            throw new MQClientException("consumer subscribe exception", e);
+        }
+    }
+
+    public void unsubscribe(String topic) {
+        if (null != topic) {
+            this.consumer.unsubscribe(topic);
+        }
+    }
+
+    class MessageListenerImpl implements MessageListenerConcurrently {
+
+        @Override
+        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgsRMQList, ConsumeConcurrentlyContext contextRMQ) {
+            MessageExt msgRMQ = msgsRMQList.get(0);
+            javax.jms.MessageListener listener = RMQPushConsumerExt.this.subscribeTable.get(msgRMQ.getTopic());
+            if (null == listener) {
+                throw new RuntimeException("MessageListener is null");
+            }
+
+            try {
+                listener.onMessage(MessageConverter.convert2JMSMessage(msgRMQ));
+            }
+            catch (Exception e) {
+                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
+            }
+            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+        }
+    }
+
+
+    public boolean isStarted() {
+        return started.get();
+    }
+
+
+    public boolean isClosed() {
+        return !isStarted();
+    }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
new file mode 100644
index 0000000..ea4b49e
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBaseMessage.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import com.google.common.collect.Maps;
+import com.google.common.io.BaseEncoding;
+import java.io.Serializable;
+import java.util.Enumeration;
+import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import org.apache.commons.lang.builder.ToStringBuilder;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+public class JmsBaseMessage implements Message {
+    /**
+     * Message properties
+     */
+    protected Map<String, Object> properties = Maps.newHashMap();
+    /**
+     * Message headers
+     */
+    protected Map<String, Object> headers = Maps.newHashMap();
+    /**
+     * Message body
+     */
+    protected Serializable body;
+
+    @Override
+    public String getJMSMessageID() {
+        return (String) headers.get(JmsBaseConstant.JMS_MESSAGE_ID);
+    }
+
+    /**
+     * Sets the message ID.
+     * <p/>
+     * <P>JMS providers set this field when a message is sent. Do not allow User to set the message ID by yourself.
+     *
+     * @param id the ID of the message
+     * @see javax.jms.Message#getJMSMessageID()
+     */
+
+    @Override
+    public void setJMSMessageID(String id) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public long getJMSTimestamp() {
+        if (headers.containsKey(JmsBaseConstant.JMS_TIMESTAMP)) {
+            return (Long) headers.get(JmsBaseConstant.JMS_TIMESTAMP);
+        }
+        return 0;
+    }
+
+    @Override
+    public void setJMSTimestamp(long timestamp) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public byte[] getJMSCorrelationIDAsBytes() {
+        String jmsCorrelationID = getJMSCorrelationID();
+        if (jmsCorrelationID != null) {
+            try {
+                return BaseEncoding.base64().decode(jmsCorrelationID);
+            }
+            catch (Exception e) {
+                return jmsCorrelationID.getBytes();
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSCorrelationIDAsBytes(byte[] correlationID) {
+        String encodedText = BaseEncoding.base64().encode(correlationID);
+        setJMSCorrelationID(encodedText);
+    }
+
+    @Override
+    public String getJMSCorrelationID() {
+        if (headers.containsKey(JmsBaseConstant.JMS_CORRELATION_ID)) {
+            return (String) headers.get(JmsBaseConstant.JMS_CORRELATION_ID);
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSCorrelationID(String correlationID) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public Destination getJMSReplyTo() {
+        if (headers.containsKey(JmsBaseConstant.JMS_REPLY_TO)) {
+            return (Destination) headers.get(JmsBaseConstant.JMS_REPLY_TO);
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSReplyTo(Destination replyTo) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @Override
+    public String toString() {
+        return ToStringBuilder.reflectionToString(this);
+    }
+
+    @Override
+    public Destination getJMSDestination() {
+        if (headers.containsKey(JmsBaseConstant.JMS_DESTINATION)) {
+            return (Destination) headers.get(JmsBaseConstant.JMS_DESTINATION);
+        }
+        return null;
+    }
+
+    @Override
+    public void setJMSDestination(Destination destination) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T getBody(Class<T> clazz) throws JMSException {
+        if (clazz.isInstance(body)) {
+            return (T) body;
+        }
+        else {
+            throw new IllegalArgumentException("The class " + clazz
+                + " is unknown to this implementation");
+        }
+    }
+
+    @Override
+    public int getJMSDeliveryMode() {
+        if (headers.containsKey(JmsBaseConstant.JMS_DELIVERY_MODE)) {
+            return (Integer) headers.get(JmsBaseConstant.JMS_DELIVERY_MODE);
+        }
+        return 0;
+    }
+
+    /**
+     * Sets the <CODE>DeliveryMode</CODE> value for this message.
+     * <p/>
+     * <P>JMS providers set this field when a message is sent. ROCKETMQ only support DeliveryMode.PERSISTENT mode. So do not
+     * allow User to set this by yourself, but you can get the default mode by <CODE>getJMSDeliveryMode</CODE> method.
+     *
+     * @param deliveryMode the delivery mode for this message
+     * @see javax.jms.Message#getJMSDeliveryMode()
+     * @see javax.jms.DeliveryMode
+     */
+
+    @Override
+    public void setJMSDeliveryMode(int deliveryMode) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public boolean isBodyAssignableTo(Class<?> clazz) throws JMSException {
+        return clazz.isInstance(body);
+    }
+
+    @Override
+    public boolean getJMSRedelivered() {
+        return headers.containsKey(JmsBaseConstant.JMS_REDELIVERED)
+            && (Boolean) headers.get(JmsBaseConstant.JMS_REDELIVERED);
+    }
+
+    @Override
+    public void setJMSRedelivered(boolean redelivered) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    /**
+     * copy meta data from source message
+     *
+     * @param sourceMessage source message
+     */
+    public void copyMetaData(JmsBaseMessage sourceMessage) {
+        if (!sourceMessage.getHeaders().isEmpty()) {
+            for (Map.Entry<String, Object> entry : sourceMessage.getHeaders().entrySet()) {
+                if (!headerExits(entry.getKey())) {
+                    setHeader(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+        if (!sourceMessage.getProperties().isEmpty()) {
+            for (Map.Entry<String, Object> entry : sourceMessage.getProperties().entrySet()) {
+                if (!propertyExists(entry.getKey())) {
+                    setObjectProperty(entry.getKey(), entry.getValue());
+                }
+            }
+        }
+    }
+
+    @Override
+    public String getJMSType() {
+        return (String) headers.get(JmsBaseConstant.JMS_TYPE);
+    }
+
+    @Override
+    public void setJMSType(String type) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public Map<String, Object> getHeaders() {
+        return this.headers;
+    }
+
+    @Override
+    public long getJMSExpiration() {
+        if (headers.containsKey(JmsBaseConstant.JMS_EXPIRATION)) {
+            return (Long) headers.get(JmsBaseConstant.JMS_EXPIRATION);
+        }
+        return 0;
+    }
+
+    @Override
+    public void setJMSExpiration(long expiration) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public boolean headerExits(String name) {
+        return this.headers.containsKey(name);
+    }
+
+    @Override
+    public int getJMSPriority() {
+        if (headers.containsKey(JmsBaseConstant.JMS_PRIORITY)) {
+            return (Integer) headers.get(JmsBaseConstant.JMS_PRIORITY);
+        }
+        return 5;
+    }
+
+    @Override
+    public void setJMSPriority(int priority) {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void setHeader(String name, Object value) {
+        this.headers.put(name, value);
+    }
+
+    public Map<String, Object> getProperties() {
+        return this.properties;
+    }
+
+    public void setProperties(Map<String, Object> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public void acknowledge() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    @Override
+    public void clearProperties() {
+        this.properties.clear();
+    }
+
+    @Override
+    public void clearBody() {
+        this.body = null;
+    }
+
+    @Override
+    public boolean propertyExists(String name) {
+        return properties.containsKey(name);
+    }
+
+    @Override
+    public boolean getBooleanProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Boolean ? (Boolean) value : Boolean.valueOf(value.toString());
+        }
+        return false;
+    }
+
+    @Override
+    public byte getByteProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Byte ? (Byte) value : Byte.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public short getShortProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Short ? (Short) value : Short.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public int getIntProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Integer ? (Integer) value : Integer.valueOf(value.toString());
+        }
+        return 0;
+    }
+
+    @Override
+    public long getLongProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Long ? (Long) value : Long.valueOf(value.toString());
+        }
+        return 0L;
+    }
+
+    @Override
+    public float getFloatProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Float ? (Float) value : Float.valueOf(value.toString());
+        }
+        return 0f;
+    }
+
+    @Override
+    public double getDoubleProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            Object value = getObjectProperty(name);
+            return value instanceof Double ? (Double) value : Double.valueOf(value.toString());
+        }
+        return 0d;
+    }
+
+    @Override
+    public String getStringProperty(String name) throws JMSException {
+        if (propertyExists(name)) {
+            return getObjectProperty(name).toString();
+        }
+        return null;
+    }
+
+    @Override
+    public Object getObjectProperty(String name) throws JMSException {
+        return this.properties.get(name);
+    }
+
+    @Override
+    public Enumeration<?> getPropertyNames() throws JMSException {
+        final Object[] keys = this.properties.keySet().toArray();
+        return new Enumeration<Object>() {
+            int i;
+
+            @Override
+            public boolean hasMoreElements() {
+                return i < keys.length;
+            }
+
+            @Override
+            public Object nextElement() {
+                return keys[i++];
+            }
+        };
+    }
+
+    @Override
+    public void setBooleanProperty(String name, boolean value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setByteProperty(String name, byte value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setShortProperty(String name, short value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setIntProperty(String name, int value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setLongProperty(String name, long value) {
+        setObjectProperty(name, value);
+    }
+
+    public void setFloatProperty(String name, float value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setDoubleProperty(String name, double value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setStringProperty(String name, String value) {
+        setObjectProperty(name, value);
+    }
+
+    @Override
+    public void setObjectProperty(String name, Object value) {
+        if (value instanceof Number || value instanceof String || value instanceof Boolean) {
+            this.properties.put(name, value);
+        }
+        else {
+            throw new IllegalArgumentException(
+                "Value should be boolean, byte, short, int, long, float, double, and String.");
+        }
+    }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
new file mode 100644
index 0000000..b1e85b0
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessage.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import javax.jms.MessageNotWriteableException;
+import org.apache.rocketmq.jms.util.ExceptionUtil;
+
+/**
+ * The <CODE>BytesMessage</CODE> methods are based largely on those found in <CODE>java.io.DataInputStream</CODE> and
+ * <CODE>java.io.DataOutputStream</CODE>. <P> Notice:Although the JMS API allows the use of message properties with byte
+ * messages, they are typically not used, since the inclusion of properties may affect the format. <P>
+ */
+public class JmsBytesMessage extends JmsBaseMessage implements BytesMessage {
+    private DataInputStream dataAsInput;
+    private DataOutputStream dataAsOutput;
+    private ByteArrayOutputStream bytesOut;
+    private byte[] bytesIn;
+
+    /**
+     * Message created for reading
+     *
+     * @param data
+     */
+    public JmsBytesMessage(byte[] data) {
+        this.bytesIn = data;
+        dataAsInput = new DataInputStream(new ByteArrayInputStream(data, 0, data.length));
+    }
+
+    /**
+     * Message created to be sent
+     */
+    public JmsBytesMessage() {
+        bytesOut = new ByteArrayOutputStream();
+        dataAsOutput = new DataOutputStream(bytesOut);
+    }
+
+    public long getBodyLength() throws JMSException {
+        return getData().length;
+    }
+
+    /**
+     * @return the data
+     */
+    public byte[] getData() {
+        if (bytesOut != null) {
+            return bytesOut.toByteArray();
+        }
+        else {
+            return bytesIn;
+        }
+
+    }
+
+    public boolean readBoolean() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public byte readByte() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readUnsignedByte() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public short readShort() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readUnsignedShort() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public char readChar() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readInt() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public long readLong() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public float readFloat() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public double readDouble() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public String readUTF() throws JMSException {
+        throw new UnsupportedOperationException("Unsupported!");
+    }
+
+    public int readBytes(byte[] value) throws JMSException {
+        return readBytes(value, value.length);
+    }
+
+    public int readBytes(byte[] value, int length) throws JMSException {
+        if (length > value.length) {
+            throw new IndexOutOfBoundsException("length must be smaller than the length of value");
+        }
+        if (dataAsInput == null) {
+            throw new MessageNotReadableException("Message is not readable! ");
+        }
+        try {
+            int offset = 0;
+            while (offset < length) {
+                int read = dataAsInput.read(value, offset, length - offset);
+                if (read < 0) {
+                    break;
+                }
+                offset += read;
+            }
+
+            if (offset == 0 && length != 0) {
+                return -1;
+            }
+            else {
+                return offset;
+            }
+        }
+        catch (IOException e) {
+            throw handleInputException(e);
+        }
+
+    }
+
+    public void writeBoolean(boolean value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeByte(byte value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeShort(short value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeChar(char value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeInt(int value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeLong(long value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeFloat(float value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeDouble(double value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeUTF(String value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void writeBytes(byte[] value) throws JMSException {
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! ");
+        }
+        try {
+            dataAsOutput.write(value);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeBytes(byte[] value, int offset, int length) throws JMSException {
+        if (dataAsOutput == null) {
+            throw new MessageNotWriteableException("Message is not writable! ");
+        }
+        try {
+            dataAsOutput.write(value, offset, length);
+        }
+        catch (IOException e) {
+            throw handleOutputException(e);
+        }
+    }
+
+    public void writeObject(Object value) throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    public void reset() throws JMSException {
+        ExceptionUtil.handleUnSupportedException();
+    }
+
+    private JMSException handleOutputException(final IOException e) {
+        JMSException ex = new JMSException(e.getMessage());
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+
+    private JMSException handleInputException(final IOException e) {
+        JMSException ex;
+        if (e instanceof EOFException) {
+            ex = new MessageEOFException(e.getMessage());
+        }
+        else {
+            ex = new MessageFormatException(e.getMessage());
+        }
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
new file mode 100644
index 0000000..f67da14
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+public class JmsObjectMessage extends JmsBaseMessage implements ObjectMessage {
+
+    public JmsObjectMessage(Serializable object) {
+        this.body = object;
+    }
+
+    public JmsObjectMessage() {
+
+    }
+
+    public Serializable getObject() throws JMSException {
+        return this.body;
+    }
+
+    public void setObject(Serializable object) throws JMSException {
+        this.body = object;
+    }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
new file mode 100644
index 0000000..ce19b51
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/domain/message/JmsTextMessage.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import javax.jms.TextMessage;
+
+public class JmsTextMessage extends JmsBaseMessage implements TextMessage {
+    private String text;
+
+    public JmsTextMessage() {
+
+    }
+
+    public JmsTextMessage(String text) {
+        setText(text);
+    }
+
+    public void clearBody() {
+        this.text = null;
+        super.clearBody();
+    }
+
+    public String getText() throws JMSException {
+        return this.text;
+    }
+
+    public void setText(String text) {
+        this.body = text;
+        this.text = text;
+    }
+
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
new file mode 100644
index 0000000..bd926e5
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/ExceptionUtil.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import javax.jms.JMSException;
+
+public class ExceptionUtil {
+    public static final boolean SKIP_SET_EXCEPTION
+        = Boolean.parseBoolean(System.getProperty("skip.set.exception", "false"));
+
+    public static void handleUnSupportedException() {
+        if (!ExceptionUtil.SKIP_SET_EXCEPTION) {
+            throw new UnsupportedOperationException("Operation unsupported! If you want to skip this Exception," +
+                " use '-Dskip.set.exception=true' in JVM options.");
+        }
+    }
+
+    public static JMSException convertToJmsException(Exception e, String extra) {
+        Preconditions.checkNotNull(extra);
+        Preconditions.checkNotNull(e);
+        JMSException jmsException = new JMSException(extra);
+        jmsException.initCause(e);
+        return jmsException;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
new file mode 100644
index 0000000..3cf03f9
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MessageConverter.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import javax.jms.BytesMessage;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.domain.message.JmsBaseMessage;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+
+import static org.apache.rocketmq.jms.domain.JmsBaseMessageProducer.initRocketMQHeaders;
+
+public class MessageConverter {
+    public static byte[] getContentFromJms(javax.jms.Message jmsMessage) throws Exception {
+        byte[] content;
+        if (jmsMessage instanceof TextMessage) {
+            if (StringUtils.isEmpty(((TextMessage) jmsMessage).getText())) {
+                throw new IllegalArgumentException("Message body length is zero");
+            }
+            content = MsgConvertUtil.string2Bytes(((TextMessage) jmsMessage).getText(),
+                Charsets.UTF_8.toString());
+        }
+        else if (jmsMessage instanceof ObjectMessage) {
+            if (((ObjectMessage) jmsMessage).getObject() == null) {
+                throw new IllegalArgumentException("Message body length is zero");
+            }
+            content = MsgConvertUtil.objectSerialize(((ObjectMessage) jmsMessage).getObject());
+        }
+        else if (jmsMessage instanceof BytesMessage) {
+            JmsBytesMessage bytesMessage = (JmsBytesMessage) jmsMessage;
+            if (bytesMessage.getBodyLength() == 0) {
+                throw new IllegalArgumentException("Message body length is zero");
+            }
+            content = bytesMessage.getData();
+        }
+        else {
+            throw new IllegalArgumentException("Unknown message type " + jmsMessage.getJMSType());
+        }
+
+        return content;
+    }
+
+    public static JmsBaseMessage convert2JMSMessage(MessageExt msg) throws Exception {
+        JmsBaseMessage message;
+        if (MsgConvertUtil.MSGMODEL_BYTES.equals(
+            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+            message = new JmsBytesMessage(msg.getBody());
+        }
+        else if (MsgConvertUtil.MSGMODEL_OBJ.equals(
+            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+            message = new JmsObjectMessage(MsgConvertUtil.objectDeserialize(msg.getBody()));
+        }
+        else if (MsgConvertUtil.MSGMODEL_TEXT.equals(
+            msg.getUserProperty(MsgConvertUtil.JMS_MSGMODEL))) {
+            message = new JmsTextMessage(MsgConvertUtil.bytes2String(msg.getBody(),
+                Charsets.UTF_8.toString()));
+        }
+        else {
+            // rocketmq producer sends bytesMessage without setting JMS_MSGMODEL.
+            message = new JmsBytesMessage(msg.getBody());
+        }
+
+        //-------------------------set headers-------------------------
+        Map<String, Object> properties = new HashMap<String, Object>();
+
+        message.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:" + msg.getMsgId());
+
+        if (msg.getReconsumeTimes() > 0) {
+            message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.TRUE);
+        }
+        else {
+            message.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+        }
+
+        Map<String, String> propertiesMap = msg.getProperties();
+        if (propertiesMap != null) {
+            for (String properName : propertiesMap.keySet()) {
+                String properValue = propertiesMap.get(properName);
+                if (JmsBaseConstant.JMS_DESTINATION.equals(properName)) {
+                    String destinationStr = properValue;
+                    if (null != destinationStr) {
+                        List<String> msgTuple = Arrays.asList(destinationStr.split(":"));
+                        message.setHeader(JmsBaseConstant.JMS_DESTINATION,
+                            new JmsBaseTopic(msgTuple.get(0), msgTuple.get(1)));
+                    }
+                }
+                else if (JmsBaseConstant.JMS_DELIVERY_MODE.equals(properName) ||
+                    JmsBaseConstant.JMS_PRIORITY.equals(properName)) {
+                    message.setHeader(properName, properValue);
+                }
+                else if (JmsBaseConstant.JMS_TIMESTAMP.equals(properName) ||
+                    JmsBaseConstant.JMS_EXPIRATION.equals(properName)) {
+                    message.setHeader(properName, properValue);
+                }
+                else if (JmsBaseConstant.JMS_CORRELATION_ID.equals(properName) ||
+                    JmsBaseConstant.JMS_TYPE.equals(properName)) {
+                    message.setHeader(properName, properValue);
+                }
+                else if (JmsBaseConstant.JMS_MESSAGE_ID.equals(properName) ||
+                    JmsBaseConstant.JMS_REDELIVERED.equals(properName)) {
+                    //JMS_MESSAGE_ID should set by msg.getMsgID()
+                    continue;
+                }
+                else {
+                    properties.put(properName, properValue);
+                }
+            }
+        }
+
+        //Handle System properties, put into header.
+        //add what?
+        message.setProperties(properties);
+
+        return message;
+    }
+
+    public static Message convert2RMQMessage(JmsBaseMessage jmsMsg) throws Exception {
+        Message rocketmqMsg = new MessageExt();
+        // 1. Transform message body
+        rocketmqMsg.setBody(MessageConverter.getContentFromJms(jmsMsg));
+
+        // 2. Transform topic and messageType
+        JmsBaseTopic destination = (JmsBaseTopic) jmsMsg.getHeaders().get(JmsBaseConstant.JMS_DESTINATION);
+        String topic = destination.getMessageTopic();
+        rocketmqMsg.setTopic(topic);
+        String messageType = destination.getMessageType();
+        Preconditions.checkState(!messageType.contains("||"),
+            "'||' can not be in the destination when sending a message");
+        rocketmqMsg.setTags(messageType);
+
+        // 3. Transform message properties
+        Properties properties = initRocketMQHeaders(jmsMsg, topic, messageType);
+        for (String name : properties.stringPropertyNames()) {
+            String value = properties.getProperty(name);
+            if (MessageConst.PROPERTY_KEYS.equals(name)) {
+                rocketmqMsg.setKeys(value);
+            } else if (MessageConst.PROPERTY_TAGS.equals(name)) {
+                rocketmqMsg.setTags(value);
+            } else if (MessageConst.PROPERTY_DELAY_TIME_LEVEL.equals(name)) {
+                rocketmqMsg.setDelayTimeLevel(Integer.parseInt(value));
+            } else if (MessageConst.PROPERTY_WAIT_STORE_MSG_OK.equals(name)) {
+                rocketmqMsg.setWaitStoreMsgOK(Boolean.parseBoolean(value));
+            } else if (MessageConst.PROPERTY_BUYER_ID.equals(name)) {
+                rocketmqMsg.setBuyerId(value);
+            } else {
+                rocketmqMsg.putUserProperty(name, value);
+            }
+        }
+
+        return rocketmqMsg;
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
new file mode 100644
index 0000000..ec55bbc
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/MsgConvertUtil.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+public class MsgConvertUtil {
+
+    public static final byte[] EMPTY_BYTES = new byte[0];
+    public static final String EMPTY_STRING = "";
+
+    public static final String JMS_MSGMODEL = "jmsMsgModel";
+    /**
+     * To adapt this scene: "Notify client try to receive ObjectMessage sent by JMS client" Set notify out message
+     * model, value can be textMessage OR objectMessage
+     */
+    public static final String COMPATIBLE_FIELD_MSGMODEL = "notifyOutMsgModel";
+
+    public static final String MSGMODEL_TEXT = "textMessage";
+    public static final String MSGMODEL_BYTES = "bytesMessage";
+    public static final String MSGMODEL_OBJ = "objectMessage";
+
+    public static final String MSG_TOPIC = "msgTopic";
+    public static final String MSG_TYPE = "msgType";
+
+    public static byte[] objectSerialize(Object object) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        ObjectOutputStream oos = new ObjectOutputStream(baos);
+        oos.writeObject(object);
+        oos.close();
+        baos.close();
+        return baos.toByteArray();
+    }
+
+    public static Serializable objectDeserialize(byte[] bytes) throws IOException, ClassNotFoundException {
+        ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+        ObjectInputStream ois = new ObjectInputStream(bais);
+        ois.close();
+        bais.close();
+        return (Serializable) ois.readObject();
+    }
+
+    public static final byte[] string2Bytes(String s, String charset) {
+        if (null == s) {
+            return EMPTY_BYTES;
+        }
+        byte[] bs = null;
+        try {
+            bs = s.getBytes(charset);
+        }
+        catch (Exception e) {
+            // ignore
+        }
+        return bs;
+    }
+
+    public static final String bytes2String(byte[] bs, String charset) {
+        if (null == bs) {
+            return EMPTY_STRING;
+        }
+        String s = null;
+        try {
+            s = new String(bs, charset);
+        }
+        catch (Exception e) {
+            // ignore
+        }
+        return s;
+    }
+}
diff --git a/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
new file mode 100644
index 0000000..9b29928
--- /dev/null
+++ b/rocketmq-jms/core/src/main/java/org/apache/rocketmq/jms/util/URISpecParser.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import java.util.Map;
+import org.apache.commons.lang.StringUtils;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+
+public abstract class URISpecParser {
+
+    private static final String DEFAULT_BROKER = "rocketmq";
+
+    /**
+     * ConnectionUrl spec is broker://ip:port?key1=value1&key2=value2
+     *
+     * @param uri Just like broker://ip:port?key1=value1&key2=value2
+     * @return The parameters' map
+     */
+    public static Map<String, String> parseURI(String uri) {
+        Preconditions.checkArgument(null != uri && !uri.trim().isEmpty(), "Uri can not be empty!");
+
+        Map<String, String> results = Maps.newHashMap();
+        String broker = uri.substring(0, uri.indexOf(":"));
+        results.put(CommonConstant.PROVIDER, broker);
+
+        if (broker.equals(DEFAULT_BROKER)) {
+            //Special handle for alibaba inner mq broker
+            String queryStr = uri.substring(uri.indexOf("?") + 1, uri.length());
+            if (StringUtils.isNotEmpty(queryStr)) {
+                String[] params = queryStr.split("&");
+                for (String param : params) {
+                    if (param.contains("=")) {
+                        String[] values = param.split("=", 2);
+                        results.put(values[0], values[1]);
+                    }
+                }
+            }
+        }
+        else {
+            throw new IllegalArgumentException("Broker must be rocketmq");
+        }
+        return results;
+    }
+}
diff --git a/rocketmq-jms/core/src/main/resources/application.conf b/rocketmq-jms/core/src/main/resources/application.conf
new file mode 100644
index 0000000..713c915
--- /dev/null
+++ b/rocketmq-jms/core/src/main/resources/application.conf
@@ -0,0 +1 @@
+version = ${project.version}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
new file mode 100644
index 0000000..d77b13e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestListener.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import org.junit.Assert;
+
+public class JmsTestListener implements MessageListener {
+
+    private int expectd;
+    private CountDownLatch latch;
+    private AtomicInteger consumedNum = new AtomicInteger(0);
+
+    public JmsTestListener() {
+        this.expectd = 10;
+    }
+    public JmsTestListener(int expectd) {
+        this.expectd = expectd;
+    }
+    public JmsTestListener(int expected, CountDownLatch latch) {
+        this.expectd = expected;
+        this.latch = latch;
+    }
+    @Override
+    public void onMessage(Message message) {
+        try {
+            Assert.assertNotNull(message);
+            Assert.assertNotNull(message.getJMSMessageID());
+            if (consumedNum.incrementAndGet() == expectd && latch != null) {
+                latch.countDown();
+            }
+        }
+        catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public int getConsumedNum() {
+        return consumedNum.get();
+    }
+
+    public void setLatch(CountDownLatch latch) {
+        this.latch = latch;
+    }
+
+    public void setExpectd(int expectd) {
+        this.expectd = expectd;
+    }
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
new file mode 100644
index 0000000..855cb19
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/JmsTestUtil.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.rocketmq.client.producer.MQProducer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageProducer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+
+public class JmsTestUtil {
+    public static MQProducer getMQProducer(String producerId) throws Exception {
+        Assert.assertNotNull(producerId);
+        Field field = JmsBaseMessageProducer.class.getDeclaredField("producerMap");
+        field.setAccessible(true);
+        ConcurrentMap<String, MQProducer> producerMap = (ConcurrentMap<String, MQProducer>) field.get(null);
+        return  producerMap.get(producerId);
+    }
+    public static RMQPushConsumerExt getRMQPushConsumerExt(String consumerId) throws Exception {
+        Assert.assertNotNull(consumerId);
+        Field field = JmsBaseMessageConsumer.class.getDeclaredField("consumerMap");
+        field.setAccessible(true);
+        ConcurrentMap<String, RMQPushConsumerExt> consumerMap = (ConcurrentMap<String, RMQPushConsumerExt>) field.get(null);
+        return  consumerMap.get(consumerId);
+    }
+    public static void checkConsumerState(String consumerId, boolean isNull, boolean isStarted) throws Exception {
+        RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+        if (isNull) {
+            Assert.assertNull(rmqPushConsumerExt);
+        } else  {
+            Assert.assertNotNull(rmqPushConsumerExt);
+            Assert.assertEquals(isStarted, rmqPushConsumerExt.isStarted());
+        }
+    }
+
+
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
new file mode 100644
index 0000000..9fe9f5e
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsBytesMessageTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsBytesMessageTest {
+
+    private byte[] receiveData = "receive data test".getBytes();
+    private byte[] sendData = "send data test".getBytes();
+
+    @Test
+    public void testGetData() throws Exception {
+        JmsBytesMessage readMessage = new JmsBytesMessage(receiveData);
+
+        System.out.println(new String(readMessage.getData()));
+        Assert.assertEquals(new String(receiveData), new String(readMessage.getData()));
+
+        JmsBytesMessage sendMessage = new JmsBytesMessage();
+        sendMessage.writeBytes(sendData, 0, sendData.length);
+
+        System.out.println(new String(sendMessage.getData()));
+        Assert.assertEquals(new String(sendData), new String(sendMessage.getData()));
+
+    }
+
+    @Test
+    public void testGetBodyLength() throws Exception {
+
+        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+        System.out.println(bytesMessage.getBodyLength());
+        Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+    }
+
+    @Test
+    public void testReadBytes() throws Exception {
+        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+        Assert.assertEquals(bytesMessage.getBodyLength(), receiveData.length);
+        byte[] receiveValue = new byte[receiveData.length];
+        bytesMessage.readBytes(receiveValue);
+
+        System.out.println(new String(receiveValue));
+        Assert.assertEquals(new String(receiveValue), new String(receiveData));
+
+    }
+
+    @Test
+    public void testReadBytes1() throws Exception {
+        JmsBytesMessage bytesMessage = new JmsBytesMessage(receiveData);
+
+        byte[] receiveValue1 = new byte[2];
+        bytesMessage.readBytes(receiveValue1, 2);
+        System.out.println(new String(receiveValue1));
+        Assert.assertEquals(new String(receiveData).substring(0, 2), new String(receiveValue1));
+
+        byte[] receiceValue2 = new byte[2];
+        bytesMessage.readBytes(receiceValue2, 2);
+        System.out.println(new String(receiceValue2));
+        Assert.assertEquals(new String(receiveData).substring(2, 4), new String(receiceValue2));
+
+    }
+
+    @Test
+    public void testWriteBytes() throws Exception {
+        JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+        jmsBytesMessage.writeBytes(sendData);
+
+        System.out.println(new String(jmsBytesMessage.getData()));
+        Assert.assertEquals(new String(jmsBytesMessage.getData()), new String(sendData));
+
+    }
+
+    @Test
+    public void testException() throws Exception {
+        JmsBytesMessage jmsBytesMessage = new JmsBytesMessage();
+
+        byte[] receiveValue = new byte[receiveData.length];
+//        Throws out NullPointerException
+//        jmsBytesMessage.readBytes(receiveValue);
+
+        JmsBytesMessage sendMessage = new JmsBytesMessage(sendData);
+//        Throws out NullPointerException
+//        sendMessage.writeBytes("hello again".getBytes());
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
new file mode 100644
index 0000000..b570142
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsMessageConvertTest.java
@@ -0,0 +1,52 @@
+package org.apache.rocketmq.jms.domain.message;
+
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.jms.domain.JmsBaseConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseTopic;
+import org.apache.rocketmq.jms.util.MessageConverter;
+import org.apache.rocketmq.jms.util.MsgConvertUtil;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsMessageConvertTest {
+    @Test
+    public void testCovert2RMQ() throws Exception {
+        //init jmsBaseMessage
+        String topic = "TestTopic";
+        String messageType = "TagA";
+
+        JmsBaseMessage jmsBaseMessage = new JmsTextMessage("testText");
+        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_DESTINATION, new JmsBaseTopic(topic, messageType));
+        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_MESSAGE_ID, "ID:null");
+        jmsBaseMessage.setHeader(JmsBaseConstant.JMS_REDELIVERED, Boolean.FALSE);
+
+        jmsBaseMessage.setObjectProperty(MsgConvertUtil.JMS_MSGMODEL, MsgConvertUtil.MSGMODEL_TEXT);
+        jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TOPIC, topic);
+        jmsBaseMessage.setObjectProperty(MsgConvertUtil.MSG_TYPE, messageType);
+        jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_TAGS, messageType);
+        jmsBaseMessage.setObjectProperty(MessageConst.PROPERTY_KEYS, messageType);
+
+        //convert to RMQMessage
+        MessageExt message = (MessageExt)MessageConverter.convert2RMQMessage(jmsBaseMessage);
+
+        System.out.println(message);
+
+        //then convert back to jmsBaseMessage
+        JmsBaseMessage jmsBaseMessageBack = MessageConverter.convert2JMSMessage(message);
+
+        JmsTextMessage jmsTextMessage = (JmsTextMessage) jmsBaseMessage;
+        JmsTextMessage jmsTextMessageBack = (JmsTextMessage) jmsBaseMessageBack;
+
+        Assert.assertEquals(jmsTextMessage.getText(), jmsTextMessageBack.getText());
+        Assert.assertEquals(jmsTextMessage.getJMSDestination().toString(), jmsTextMessageBack.getJMSDestination().toString());
+        Assert.assertEquals(jmsTextMessage.getJMSMessageID(), jmsTextMessageBack.getJMSMessageID());
+        Assert.assertEquals(jmsTextMessage.getJMSRedelivered(), jmsTextMessageBack.getJMSRedelivered());
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.JMS_MSGMODEL));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TOPIC), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TOPIC));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MsgConvertUtil.MSG_TYPE), jmsTextMessageBack.getHeaders().get(MsgConvertUtil.MSG_TYPE));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_TAGS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_TAGS));
+        Assert.assertEquals(jmsTextMessage.getHeaders().get(MessageConst.PROPERTY_KEYS), jmsTextMessageBack.getHeaders().get(MessageConst.PROPERTY_KEYS));
+
+    }
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
new file mode 100644
index 0000000..6951976
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsObjectMessageTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import java.io.Serializable;
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsObjectMessageTest {
+
+    @Test
+    public void testGetObject() {
+        JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+        try {
+            Assert.assertEquals(jmsObjectMessage.getObject(), new User("jack", 20));
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testGetBody() {
+        JmsObjectMessage jmsObjectMessage = new JmsObjectMessage(new User("jack", 20));
+
+        try {
+            User user = (User)jmsObjectMessage.getBody(Object.class);
+            System.out.println(user.getName() + ": " + user.getAge());
+            Assert.assertEquals(jmsObjectMessage.getBody(Object.class), jmsObjectMessage.getObject());
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private class User implements Serializable {
+        private String name;
+        private int age;
+
+        private User(String name, int age) {
+            this.name = name;
+            this.age = age;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj)
+                return true;
+            if (obj == null || getClass() != obj.getClass())
+                return false;
+
+            User user = (User)obj;
+            if (age != user.getAge())
+                return false;
+            if (name != null ? !name.equals(user.getName()) : user.getName() != null)
+                return false;
+            return true;
+        }
+
+        public int getAge() {
+            return age;
+        }
+
+        public void setAge(int age) {
+            this.age = age;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+    }
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
new file mode 100644
index 0000000..d3c8287
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/domain/message/JmsTextMessageTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.domain.message;
+
+import javax.jms.JMSException;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JmsTextMessageTest {
+    private String text = "jmsTextMessage test";
+
+    @Test
+    public void testGetBody() {
+        JmsTextMessage jmsTextMessage = new JmsTextMessage(text);
+        try {
+            Assert.assertEquals(jmsTextMessage.getBody(String.class), text);
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Test
+    public void testSetGetText() {
+        JmsTextMessage jmsTextMessage = new JmsTextMessage();
+        jmsTextMessage.setText(text);
+        try {
+            Assert.assertEquals(jmsTextMessage.getText(), text);
+        }
+        catch (JMSException e) {
+            e.printStackTrace();
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
new file mode 100644
index 0000000..02fe111
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/IntegrationTestBase.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import org.apache.rocketmq.broker.BrokerController;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.BrokerConfig;
+import org.apache.rocketmq.common.TopicConfig;
+import org.apache.rocketmq.common.namesrv.NamesrvConfig;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.namesrv.NamesrvController;
+import org.apache.rocketmq.remoting.netty.NettyClientConfig;
+import org.apache.rocketmq.remoting.netty.NettyServerConfig;
+import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class IntegrationTestBase {
+    public static Logger logger = LoggerFactory.getLogger(IntegrationTestBase.class);
+
+    protected static Random random = new Random();
+    protected static final String SEP = File.separator;
+
+
+    protected static String topic = "jms-test";
+    protected static String topic2 = "jms-test-2";
+    protected static String messageType = "TagA";
+    protected static String producerId = "PID-jms-test";
+    protected static String consumerId = "CID-jms-test";
+    protected static String consumerId2 = "CID-jms-test-2";
+    protected static String nameServer;
+    protected static String text = "English test";
+    protected static int consumeThreadNums = 16;
+
+
+
+
+    protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
+    protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
+    protected static final List<File> TMPE_FILES = new ArrayList<File>();
+    protected static final List<BrokerController> BROKER_CONTROLLERS =  new ArrayList<BrokerController>();
+    protected static final List<NamesrvController> NAMESRV_CONTROLLERS = new ArrayList<NamesrvController>();
+
+
+    private static String createBaseDir() {
+        String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
+        final File file = new File(baseDir);
+        if (file.exists()) {
+            System.out.println(String.format("[%s] has already existed, please bake up and remove it for integration tests", baseDir));
+            System.exit(1);
+        }
+        TMPE_FILES.add(file);
+        return baseDir;
+    }
+
+    public static NamesrvController createAndStartNamesrv() {
+        String baseDir = createBaseDir();
+        NamesrvConfig namesrvConfig = new NamesrvConfig();
+        NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
+        namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
+
+        nameServerNettyServerConfig.setListenPort(9000 + random.nextInt(1000));
+        NamesrvController namesrvController = new NamesrvController(namesrvConfig, nameServerNettyServerConfig);
+        try {
+            Assert.assertTrue(namesrvController.initialize());
+            logger.info("Name Server Start:{}", nameServerNettyServerConfig.getListenPort());
+            namesrvController.start();
+        } catch (Exception e) {
+            System.out.println("Name Server start failed");
+            System.exit(1);
+        }
+        NAMESRV_CONTROLLERS.add(namesrvController);
+        return namesrvController;
+
+    }
+
+
+    public static BrokerController createAndStartBroker(String nsAddr) {
+        String baseDir = createBaseDir();
+        BrokerConfig brokerConfig = new BrokerConfig();
+        NettyServerConfig nettyServerConfig = new NettyServerConfig();
+        NettyClientConfig nettyClientConfig = new NettyClientConfig();
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
+        brokerConfig.setBrokerIP1("127.0.0.1");
+        brokerConfig.setNamesrvAddr(nsAddr);
+        storeConfig.setStorePathRootDir(baseDir);
+        storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
+        storeConfig.setHaListenPort(8000 + random.nextInt(1000));
+        nettyServerConfig.setListenPort(10000 + random.nextInt(1000));
+        BrokerController brokerController = new BrokerController(brokerConfig, nettyServerConfig, nettyClientConfig, storeConfig);
+        try {
+            Assert.assertTrue(brokerController.initialize());
+            logger.info("Broker Start name:{} addr:{}", brokerConfig.getBrokerName(), brokerController.getBrokerAddr());
+            brokerController.start();
+        } catch (Exception e) {
+            System.out.println("Broker start failed");
+            System.exit(1);
+        }
+        BROKER_CONTROLLERS.add(brokerController);
+        return brokerController;
+    }
+
+
+
+    protected static DefaultMQAdminExt defaultMQAdminExt;
+
+    static {
+        //clear the environment
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override public void run() {
+                if (defaultMQAdminExt != null) {
+                    defaultMQAdminExt.shutdown();
+                }
+                for (NamesrvController namesrvController: NAMESRV_CONTROLLERS) {
+                    if (namesrvController != null) {
+                        namesrvController.shutdown();
+                    }
+                }
+                for (BrokerController brokerController: BROKER_CONTROLLERS) {
+                    if (brokerController != null) {
+                        brokerController.shutdown();
+                    }
+                }
+                for (File file : TMPE_FILES) {
+                    deleteFile(file);
+                }
+            }
+        });
+
+
+        NamesrvController namesrvController = IntegrationTestBase.createAndStartNamesrv();
+        nameServer = "127.0.0.1:" + namesrvController.getNettyServerConfig().getListenPort();
+        BrokerController brokerController = createAndStartBroker(nameServer);
+
+        defaultMQAdminExt = new DefaultMQAdminExt();
+        defaultMQAdminExt.setNamesrvAddr(nameServer);
+        try {
+            defaultMQAdminExt.start();
+        } catch (MQClientException e) {
+            System.out.println("DefaultMQAdminExt start failed");
+            System.exit(1);
+        }
+
+        createTopic(topic, brokerController.getBrokerAddr());
+
+
+    }
+
+    public static void deleteFile(File file) {
+        if (!file.exists()) {
+            return;
+        }
+        if (file.isFile()) {
+            file.delete();
+        } else if (file.isDirectory()) {
+            File[] files = file.listFiles();
+            for (int i = 0;i < files.length;i ++) {
+                deleteFile(files[i]);
+            }
+            file.delete();
+        }
+    }
+    public static void createTopic(String topic, String addr) {
+        TopicConfig topicConfig = new TopicConfig();
+        topicConfig.setTopicName(topic);
+        topicConfig.setReadQueueNums(4);
+        topicConfig.setWriteQueueNums(4);
+        try {
+            defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
+        } catch (Exception e) {
+            logger.error("Create topic:{} addr:{} failed", addr, topic);
+        }
+    }
+
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
new file mode 100644
index 0000000..367700a
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsClientIT.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.jms.JmsTestListener;
+import org.apache.rocketmq.jms.JmsTestUtil;
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsClientIT extends IntegrationTestBase {
+
+    @Test
+    public void testConfigInURI() throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+            CommonConstant.PRODUCERID, producerId,
+            CommonConstant.CONSUMERID, consumerId,
+            CommonConstant.NAMESERVER, nameServer,
+            CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+            CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+            CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        connection.start();
+        try {
+            Destination destination = session.createTopic(topic + ":" + messageType);
+            session.createConsumer(destination);
+            session.createProducer(destination);
+
+            DefaultMQPushConsumer rmqPushConsumer = (DefaultMQPushConsumer) getRMQPushConsumerExt(consumerId).getConsumer();
+            Assert.assertNotNull(rmqPushConsumer);
+            Assert.assertEquals(consumerId, rmqPushConsumer.getConsumerGroup());
+            Assert.assertEquals("JMS_TEST", rmqPushConsumer.getInstanceName());
+            Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMax());
+            Assert.assertEquals(consumeThreadNums, rmqPushConsumer.getConsumeThreadMin());
+            Assert.assertEquals(nameServer, rmqPushConsumer.getNamesrvAddr());
+
+            DefaultMQProducer mqProducer = (DefaultMQProducer) JmsTestUtil.getMQProducer(producerId);
+            Assert.assertNotNull(mqProducer);
+            Assert.assertEquals(producerId, mqProducer.getProducerGroup());
+            Assert.assertEquals("JMS_TEST", mqProducer.getInstanceName());
+            Assert.assertEquals(10 * 1000, mqProducer.getSendMsgTimeout());
+            Assert.assertEquals(nameServer, mqProducer.getNamesrvAddr());
+
+            Thread.sleep(2000);
+        }
+        finally {
+            connection.close();
+        }
+
+    }
+
+
+    private Connection createConnection(String producerGroup, String consumerGroup) throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI(String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+            CommonConstant.PRODUCERID, producerGroup,
+            CommonConstant.CONSUMERID, consumerGroup,
+            CommonConstant.NAMESERVER, nameServer,
+            CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+            CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+            CommonConstant.INSTANCE_NAME, "JMS_TEST")));
+        return  connectionFactory.createConnection();
+    }
+
+    @Test
+    public void testProducerAndConsume_TwoConsumer() throws Exception {
+
+        Connection connection = createConnection(producerId, consumerId);
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Destination destinationA = session.createTopic("TopicA");
+        Destination destinationB = session.createTopic("TopicB");
+        final CountDownLatch countDownLatch = new CountDownLatch(2);
+        JmsTestListener listenerA = new JmsTestListener(10,countDownLatch);
+        JmsTestListener listenerB = new JmsTestListener(10, countDownLatch);
+
+        try {
+            //two consumers
+            MessageConsumer messageConsumerA = session.createConsumer(destinationA);
+            messageConsumerA.setMessageListener(listenerA);
+            MessageConsumer messageConsumerB = session.createConsumer(destinationB);
+            messageConsumerB.setMessageListener(listenerB);
+            //producer
+            MessageProducer messageProducer = session.createProducer(destinationA);
+            connection.start();
+
+            for (int i = 0; i < 10; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+            for (int i = 0; i < 10; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(destinationB, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+
+            if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+                Thread.sleep(2000);
+            }
+            Assert.assertEquals(10, listenerA.getConsumedNum());
+            Assert.assertEquals(10, listenerB.getConsumedNum());
+        }
+        finally {
+            //Close the connection
+            connection.close();
+        }
+
+    }
+
+    @Test
+    public void testProducerAndConsume_TagFilter() throws Exception {
+        Connection connection = createConnection(producerId, consumerId);
+        Connection anotherConnection = createConnection(producerId, consumerId +"other");
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Session anotherSession = anotherConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        Destination destinationA = session.createTopic("topic:tagA");
+        Destination destinationB = session.createTopic("topic:tagB");
+        final CountDownLatch countDownLatch = new CountDownLatch(2);
+        JmsTestListener listenerForTagA =  new JmsTestListener(10, countDownLatch);
+        JmsTestListener listenerForAll = new JmsTestListener(40, countDownLatch);
+        try {
+            session.createConsumer(destinationA).setMessageListener(listenerForTagA);
+            anotherSession.createConsumer(session.createTopic("topic")).setMessageListener(listenerForAll);
+            //producer
+            MessageProducer messageProducer = session.createProducer(destinationA);
+            connection.start();
+            anotherConnection.start();
+
+            for (int i = 0; i < 20; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+            for (int i = 0; i < 20; i++) {
+                TextMessage message = session.createTextMessage(text + i);
+                Assert.assertNull(message.getJMSMessageID());
+                messageProducer.send(destinationB, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+
+            if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+                Thread.sleep(2000);
+            }
+            Assert.assertEquals(20, listenerForTagA.getConsumedNum());
+            Assert.assertEquals(40, listenerForAll.getConsumedNum());
+        }
+        finally {
+            //Close the connection
+            connection.close();
+            anotherConnection.close();
+        }
+
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
new file mode 100644
index 0000000..6cbb7b1
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/integration/JmsConsumerIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.integration;
+
+import java.net.URI;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory;
+import org.apache.rocketmq.jms.domain.JmsBaseMessageConsumer;
+import org.apache.rocketmq.jms.domain.RMQPushConsumerExt;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.rocketmq.jms.JmsTestUtil.checkConsumerState;
+import static org.apache.rocketmq.jms.JmsTestUtil.getRMQPushConsumerExt;
+
+public class JmsConsumerIT extends IntegrationTestBase {
+
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+
+    private  MessageListener listener = new MessageListener() {
+        @Override
+        public void onMessage(Message message) {
+            try {
+                Assert.assertNotNull(message);
+                Assert.assertNotNull(message.getJMSMessageID());
+            }
+            catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    };
+
+
+    @Test
+    public void testStartIdempotency() throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        checkConsumerState(consumerId, true, false);
+        try {
+            Destination destination = session.createTopic(topic + ":" + messageType);
+            MessageConsumer consumer = session.createConsumer(destination);
+            consumer.setMessageListener(listener);
+
+            checkConsumerState(consumerId, false, false);
+
+            ((JmsBaseMessageConsumer) consumer).startConsumer();
+            checkConsumerState(consumerId, false, true);
+
+            Destination destination1 = session.createTopic(topic2 + ":" + messageType);
+            MessageConsumer consumer1 = session.createConsumer(destination1);
+            consumer1.setMessageListener(listener);
+
+            ((JmsBaseMessageConsumer) consumer1).startConsumer();
+            checkConsumerState(consumerId, false, true);
+
+            //the start is idempotent
+            connection.start();
+            connection.start();
+
+            Thread.sleep(5000);
+        }
+        finally {
+            connection.close();
+        }
+    }
+
+    @Test
+    public void testReferenceCount() throws Exception {
+        JmsBaseConnectionFactory connectionFactory = new JmsBaseConnectionFactory(new
+            URI("rocketmq://xxx?consumerId=" + consumerId + "&nameServer=" + nameServer));
+        Connection connection = connectionFactory.createConnection();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        connection.start();
+        try {
+            Destination destination = session.createTopic(topic + ":" + messageType);
+            MessageConsumer consumer = session.createConsumer(destination);
+            consumer.setMessageListener(listener);
+
+            RMQPushConsumerExt rmqPushConsumerExt = getRMQPushConsumerExt(consumerId);
+            Assert.assertNotNull(rmqPushConsumerExt);
+            Assert.assertEquals(1, rmqPushConsumerExt.getReferenceCount());
+
+
+            MessageConsumer consumer2 = session.createConsumer(destination);
+            Assert.assertEquals(2, rmqPushConsumerExt.getReferenceCount());
+
+            MessageConsumer consumer3 = session.createConsumer(session.createTopic(topic + ":" + messageType));
+
+            Assert.assertEquals(3, rmqPushConsumerExt.getReferenceCount());
+
+            session.close();
+
+            Assert.assertEquals(0, rmqPushConsumerExt.getReferenceCount());
+            Assert.assertEquals(false, rmqPushConsumerExt.isStarted());
+            Assert.assertNull(getRMQPushConsumerExt(consumerId));
+
+            Thread.sleep(5000);
+        }
+        finally {
+            connection.close();
+        }
+    }
+
+}
diff --git a/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java
new file mode 100644
index 0000000..af57f67
--- /dev/null
+++ b/rocketmq-jms/core/src/test/java/org/apache/rocketmq/jms/util/URISpecParserTest.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.util;
+
+import java.util.Map;
+import org.junit.Test;
+
+public class URISpecParserTest {
+
+    @Test
+    public void parseURI_NormalTest() {
+        Map<String, String> result = URISpecParser.parseURI("rocketmq://localhost");
+        System.out.println(result);
+
+        result = URISpecParser
+            .parseURI("rocketmq://xxx?appId=test&consumerId=testGroup");
+        System.out.println(result);
+
+        result = URISpecParser.parseURI("rocketmq:!@#$%^&*()//localhost?appId=test!@#$%^&*()");
+        System.out.println(result);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void parseURI_AbnormalTest() {
+        URISpecParser.parseURI("metaq3://localhost?appId=test");
+    }
+
+}
\ No newline at end of file
diff --git a/rocketmq-jms/pom.xml b/rocketmq-jms/pom.xml
new file mode 100644
index 0000000..2ecbd42
--- /dev/null
+++ b/rocketmq-jms/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.apache.rocketmq</groupId>
+    <artifactId>rocketmq-jms-all</artifactId>
+    <packaging>pom</packaging>
+    <version>1.0.0</version>
+    <modules>
+        <module>spring</module>
+        <module>core</module>
+    </modules>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!--maven properties -->
+        <maven.test.skip>false</maven.test.skip>
+        <maven.javadoc.skip>true</maven.javadoc.skip>
+        <!-- compiler settings properties -->
+        <maven.compiler.source>1.6</maven.compiler.source>
+        <maven.compiler.target>1.6</maven.compiler.target>
+        <surefire.version>2.19.1</surefire.version>
+        <rocketmq.version>4.0.0-incubating</rocketmq.version>
+
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.jms</groupId>
+            <artifactId>jms-api</artifactId>
+            <version>1.1-rev-1</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>18.0</version>
+        </dependency>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>2.6</version>
+        </dependency>
+
+        <!--test-->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>test</scope>
+            <version>4.12</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-namesrv</artifactId>
+            <version>${rocketmq.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-broker</artifactId>
+            <version>${rocketmq.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+
+    <build>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.5.1</version>
+                <configuration>
+                    <source>${maven.compiler.source}</source>
+                    <target>${maven.compiler.target}</target>
+                    <compilerVersion>${maven.compiler.source}</compilerVersion>
+                    <showDeprecation>true</showDeprecation>
+                    <showWarnings>true</showWarnings>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.jacoco</groupId>
+                <artifactId>jacoco-maven-plugin</artifactId>
+                <version>0.7.8</version>
+                <executions>
+                    <execution>
+                        <id>default-prepare-agent</id>
+                        <goals>
+                            <goal>prepare-agent</goal>
+                        </goals>
+                        <configuration>
+                            <destFile>${project.build.directory}/jacoco.exec</destFile>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>default-prepare-agent-integration</id>
+                        <phase>pre-integration-test</phase>
+                        <goals>
+                            <goal>prepare-agent-integration</goal>
+                        </goals>
+                        <configuration>
+                            <destFile>${project.build.directory}/jacoco-it.exec</destFile>
+                            <propertyName>failsafeArgLine</propertyName>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>default-report</id>
+                        <goals>
+                            <goal>report</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>default-report-integration</id>
+                        <goals>
+                            <goal>report-integration</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>${surefire.version}</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <version>${surefire.version}</version>
+                <configuration>
+                    <forkCount>1</forkCount>
+                    <reuseForks>true</reuseForks>
+                    <argLine>@{failsafeArgLine}</argLine>
+                </configuration>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>integration-test</goal>
+                            <goal>verify</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.eluder.coveralls</groupId>
+                <artifactId>coveralls-maven-plugin</artifactId>
+                <version>4.3.0</version>
+            </plugin>
+            <plugin>
+                <artifactId>maven-checkstyle-plugin</artifactId>
+                <version>2.17</version>
+                <executions>
+                    <execution>
+                        <id>verify</id>
+                        <phase>verify</phase>
+                        <configuration>
+                            <configLocation>style/rmq_checkstyle.xml</configLocation>
+                            <encoding>UTF-8</encoding>
+                            <consoleOutput>true</consoleOutput>
+                            <failsOnError>true</failsOnError>
+                        </configuration>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+            </resource>
+        </resources>
+    </build>
+
+</project>
\ No newline at end of file
diff --git a/rocketmq-jms/spring/pom.xml b/rocketmq-jms/spring/pom.xml
new file mode 100644
index 0000000..b2afe12
--- /dev/null
+++ b/rocketmq-jms/spring/pom.xml
@@ -0,0 +1,82 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-jms-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>1.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>rocketmq-jms-spring</artifactId>
+    <version>1.0.0</version>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <spring-version>4.1.4.RELEASE</spring-version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>rocketmq-jms</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+
+        <!-- spring 2.5.6 -->
+        <!--
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring</artifactId>
+            <version>2.5.6.SEC03</version>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-test</artifactId>
+            <version>2.5.6.SEC03</version>
+            <scope>test</scope>
+        </dependency>
+          -->
+        <!-- spring 3 or 4 -->
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-jms</artifactId>
+            <version>${spring-version}</version>
+            <optional>true</optional>
+        </dependency>
+
+        <!-- for test -->
+        <dependency>
+            <groupId>${project.parent.groupId}</groupId>
+            <artifactId>rocketmq-jms</artifactId>
+            <version>${project.parent.version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+            <version>6.8.21</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git a/rocketmq-jms/spring/src/main/java/org/apache/rocketmq/jms/spring/SimpleExMessageListenerContainer.java b/rocketmq-jms/spring/src/main/java/org/apache/rocketmq/jms/spring/SimpleExMessageListenerContainer.java
new file mode 100644
index 0000000..efbfbe9
--- /dev/null
+++ b/rocketmq-jms/spring/src/main/java/org/apache/rocketmq/jms/spring/SimpleExMessageListenerContainer.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import org.springframework.jms.listener.SimpleMessageListenerContainer;
+
+
+public class SimpleExMessageListenerContainer extends SimpleMessageListenerContainer {
+
+    private String cacheLevelName;
+
+    /**
+     * Create a MessageConsumer for the given JMS Session, registering a
+     * MessageListener for the specified listener.
+     *
+     * @param session
+     *         the JMS Session to work on
+     *
+     * @return the MessageConsumer
+     *
+     * @throws JMSException
+     *         if thrown by JMS methods
+     * @see #executeListener
+     */
+    protected MessageConsumer createListenerConsumer(final Session session) throws JMSException {
+        Destination destination = getDestination();
+        if (destination == null) {
+            destination = resolveDestinationName(session, getDestinationName());
+        }
+        MessageConsumer consumer = createConsumer(session, destination);
+        consumer.setMessageListener((MessageListener) super.getMessageListener());
+        return consumer;
+    }
+
+    /**
+     * Create a JMS MessageConsumer for the given Session and Destination.
+     * <p>
+     * This implementation uses JMS 1.1 API.
+     *
+     * @param session
+     *         the JMS Session to create a MessageConsumer for
+     * @param destination
+     *         the JMS Destination to create a MessageConsumer for
+     *
+     * @return the new JMS MessageConsumer
+     *
+     * @throws JMSException
+     *         if thrown by JMS API methods
+     */
+    protected MessageConsumer createConsumer(Session session, Destination destination)
+            throws JMSException {
+        //ONS not support message selector and other features nowadays
+        return session.createConsumer(destination);
+    }
+
+    /**
+     * @return the cacheLevelName
+     */
+    public String getCacheLevelName() {
+        return cacheLevelName;
+    }
+
+    /**
+     * @param cacheLevelName
+     *         the cacheLevelName to set
+     */
+    public void setCacheLevelName(String cacheLevelName) {
+        this.cacheLevelName = cacheLevelName;
+    }
+}
diff --git a/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsConsumeIT.java b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsConsumeIT.java
new file mode 100644
index 0000000..03da06e
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsConsumeIT.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.JmsTestListener;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.springframework.jms.core.ProducerCallback;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class JmsConsumeIT extends JmsProduceIT {
+
+
+    @Test
+    public void testConsume() throws Exception {
+        final Topic topic = (Topic) consumeContext.getBean("baseTopic");
+        JmsTestListener messageListener = (JmsTestListener) consumeContext.getBean("messageListener");
+        CountDownLatch countDownLatch = new CountDownLatch(1);
+        messageListener.setLatch(countDownLatch);
+        messageListener.setExpectd(30);
+        consumeContext.start();
+
+        for (int i = 0; i < 30; i++) {
+            jmsTemplate.execute(topic, new ProducerCallback() {
+                @Override
+                public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+                    JmsTextMessage message = (JmsTextMessage) session.createTextMessage("hello world,kafka, haha");
+                    producer.send(topic, message);
+                    Assert.assertNotNull(message.getJMSMessageID());
+                    return message;
+                }
+            });
+        }
+        if (countDownLatch.await(30, TimeUnit.SECONDS)) {
+            Thread.sleep(2000);
+        }
+        Assert.assertEquals(30, messageListener.getConsumedNum());
+        consumeContext.close();
+    }
+}
diff --git a/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsProduceIT.java b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsProduceIT.java
new file mode 100644
index 0000000..bcb968b
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/JmsProduceIT.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import com.google.common.collect.Lists;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import org.apache.rocketmq.jms.domain.message.JmsBytesMessage;
+import org.apache.rocketmq.jms.domain.message.JmsObjectMessage;
+import org.apache.rocketmq.jms.domain.message.JmsTextMessage;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.ProducerCallback;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class JmsProduceIT extends SpringTestBase {
+
+    protected JmsTemplate jmsTemplate = (JmsTemplate) produceContext.getBean("jmsTemplate");
+
+    private Topic destination = (Topic) produceContext.getBean("destination");
+
+    @Test
+    public void simpleSendTest() throws Exception {
+        //Send text message
+        jmsTemplate.execute(destination, new ProducerCallback() {
+            @Override
+            public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+                JmsTextMessage message = (JmsTextMessage) session.createTextMessage("hello world,kafka");
+                producer.send(destination, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+                return message;
+            }
+        });
+
+        //Send object message
+        jmsTemplate.execute(destination, new ProducerCallback() {
+            @Override
+            public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+                JmsObjectMessage message = (JmsObjectMessage) session.createObjectMessage(Lists.newArrayList(1, 2, 3));
+                producer.send(destination, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+                return message;
+            }
+        });
+
+        //Send byte message
+        jmsTemplate.execute(destination, new ProducerCallback() {
+            @Override
+            public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+                byte[] ts = "Von,Test".getBytes();
+                JmsBytesMessage message = (JmsBytesMessage) session.createBytesMessage();
+                message.writeBytes(ts);
+                producer.send(destination, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+                return message;
+            }
+        });
+    }
+
+
+    @Test(threadPoolSize = 2, invocationCount = 20)
+    public void multiSenderTest() throws Exception {
+        jmsTemplate.execute(destination, new ProducerCallback() {
+            @Override
+            public Object doInJms(Session session, MessageProducer producer) throws JMSException {
+                byte[] ts = "Von,Multi thread sender test".getBytes();
+                JmsBytesMessage message = (JmsBytesMessage) session.createBytesMessage();
+                message.writeBytes(ts);
+                producer.send(destination, message);
+                Assert.assertNotNull(message.getJMSMessageID());
+                return message;
+            }
+        });
+    }
+}
diff --git a/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/SpringTestBase.java b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/SpringTestBase.java
new file mode 100644
index 0000000..d830a3e
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/java/org/apache/rocketmq/jms/spring/SpringTestBase.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.rocketmq.jms.spring;
+
+import org.apache.rocketmq.jms.domain.CommonConstant;
+import org.apache.rocketmq.jms.integration.IntegrationTestBase;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+
+public class SpringTestBase extends IntegrationTestBase{
+
+    protected final static ClassPathXmlApplicationContext produceContext;
+    protected final static ClassPathXmlApplicationContext consumeContext;
+
+    static {
+        String rmqJmsUrl = String.format("rocketmq://xxx?%s=%s&%s=%s&%s=%s&%s=%s&%s=%s&%s=%s",
+            CommonConstant.PRODUCERID, producerId,
+            CommonConstant.CONSUMERID, consumerId,
+            CommonConstant.NAMESERVER, nameServer,
+            CommonConstant.CONSUME_THREAD_NUMS, consumeThreadNums,
+            CommonConstant.SEND_TIMEOUT_MILLIS, 10*1000,
+            CommonConstant.INSTANCE_NAME, "JMS_TEST");
+        System.setProperty("RMQ_JMS_URL", rmqJmsUrl);
+        produceContext = new ClassPathXmlApplicationContext("classpath:producer.xml");
+        consumeContext = new ClassPathXmlApplicationContext("classpath:consumer.xml");
+    }
+}
diff --git a/rocketmq-jms/spring/src/test/resources/consumer.xml b/rocketmq-jms/spring/src/test/resources/consumer.xml
new file mode 100644
index 0000000..0d46d8f
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/resources/consumer.xml
@@ -0,0 +1,51 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
+	   xmlns:context="http://www.springframework.org/schema/context"
+	   xsi:schemaLocation="http://www.springframework.org/schema/beans
+	http://www.springframework.org/schema/beans/spring-beans.xsd
+    http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+
+	<context:property-placeholder/>
+
+	<!-- Consumer config -->
+	<bean id="jmsConsumerConnectionFactory"
+		class="org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory">
+		<property name="connectionUri" value="${RMQ_JMS_URL}" />
+	</bean>
+
+    <bean id="baseTopic"
+          class="org.apache.rocketmq.jms.domain.JmsBaseTopic">
+        <constructor-arg index="0" value="RMQ_JMS_TEST_CONSUME" /><!--topic -->
+        <constructor-arg index="1" value="baseType" /><!-- messageType -->
+    </bean>
+    
+	<bean id="messageListener"
+		class="org.apache.rocketmq.jms.JmsTestListener"/>
+
+		
+	<jms:listener-container
+		container-class="org.apache.rocketmq.jms.spring.SimpleExMessageListenerContainer"
+		connection-factory="jmsConsumerConnectionFactory" destination-type="durableTopic">
+		<jms:listener destination="RMQ_JMS_TEST_CONSUME:baseType"
+			ref="messageListener" method="onMessage" />
+	</jms:listener-container>
+
+</beans>
\ No newline at end of file
diff --git a/rocketmq-jms/spring/src/test/resources/producer.xml b/rocketmq-jms/spring/src/test/resources/producer.xml
new file mode 100644
index 0000000..a99343a
--- /dev/null
+++ b/rocketmq-jms/spring/src/test/resources/producer.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="utf-8" ?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:context="http://www.springframework.org/schema/context"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+	http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
+    <context:property-placeholder/>
+    <!-- Producer config -->
+    <bean id="jmsProducerConnectionFactory"
+          class="org.apache.rocketmq.jms.domain.JmsBaseConnectionFactory">
+        <property name="connectionUri" value="${RMQ_JMS_URL}" />
+    </bean>
+
+    <bean id="destination"
+          class="org.apache.rocketmq.jms.domain.JmsBaseTopic">
+        <constructor-arg index="0" value="RMQ_JMS_TEST_PRODUCE" /><!--topic -->
+        <constructor-arg index="1" value="baseType" /><!-- messageType -->
+    </bean>
+
+    <bean id="jmsTemplate"
+          class="org.springframework.jms.core.JmsTemplate">
+        <property name="connectionFactory" ref="jmsProducerConnectionFactory" />
+    </bean>
+
+
+</beans>
\ No newline at end of file
diff --git a/rocketmq-jms/style/copyright/Apache.xml b/rocketmq-jms/style/copyright/Apache.xml
new file mode 100644
index 0000000..2db86d0
--- /dev/null
+++ b/rocketmq-jms/style/copyright/Apache.xml
@@ -0,0 +1,24 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <copyright>
+        <option name="myName" value="Apache"/>
+        <option name="notice"
+                value="Licensed to the Apache Software Foundation (ASF) under one or more&#10;contributor license agreements.  See the NOTICE file distributed with&#10;this work for additional information regarding copyright ownership.&#10;The ASF licenses this file to You under the Apache License, Version 2.0&#10;(the &quot;License&quot;); you may not use this file except in compliance with&#10;the License.  You may obtain a copy of the License at&#10;&#10;    http://www.apache.org/licenses/LICENSE-2.0&#10;&#10;Unless required by applicable law or agreed to in writing, software&#10;distributed under the License is distributed on an &quot;AS IS&quot; BASIS,&#10;WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.&#10;See the License for the specific language governing permissions and&#10;limitations under the License."/>
+    </copyright>
+</component>
\ No newline at end of file
diff --git a/rocketmq-jms/style/copyright/profiles_settings.xml b/rocketmq-jms/style/copyright/profiles_settings.xml
new file mode 100644
index 0000000..4c0e521
--- /dev/null
+++ b/rocketmq-jms/style/copyright/profiles_settings.xml
@@ -0,0 +1,64 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<component name="CopyrightManager">
+    <settings default="Apache">
+        <module2copyright>
+            <element module="All" copyright="Apache"/>
+        </module2copyright>
+        <LanguageOptions name="GSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="HTML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JAVA">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="addBlankAfter" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSP">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="JSPX">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="MXML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="Properties">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="SPI">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="block" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="XML">
+            <option name="fileTypeOverride" value="3"/>
+            <option name="prefixLines" value="false"/>
+        </LanguageOptions>
+        <LanguageOptions name="__TEMPLATE__">
+            <option name="separateBefore" value="true"/>
+            <option name="lenBefore" value="1"/>
+        </LanguageOptions>
+    </settings>
+</component>
\ No newline at end of file
diff --git a/rocketmq-jms/style/rmq_checkstyle.xml b/rocketmq-jms/style/rmq_checkstyle.xml
new file mode 100644
index 0000000..e3155cc
--- /dev/null
+++ b/rocketmq-jms/style/rmq_checkstyle.xml
@@ -0,0 +1,135 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<!DOCTYPE module PUBLIC
+    "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
+    "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+<!--Refer http://checkstyle.sourceforge.net/reports/google-java-style.html#s2.2-file-encoding -->
+<module name="Checker">
+
+    <property name="localeLanguage" value="en"/>
+
+    <!--To configure the check to report on the first instance in each file-->
+    <module name="FileTabCharacter"/>
+
+    <!-- header -->
+    <module name="RegexpHeader">
+        <property name="header" value="/\*\nLicensed to the Apache Software Foundation*"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="System\.out\.println"/>
+        <property name="message" value="Prohibit invoking System.out.println in source code !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//FIXME"/>
+        <property name="message" value="Recommended fix FIXME task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="//TODO"/>
+        <property name="message" value="Recommended fix TODO task !"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format" value="@alibaba"/>
+        <property name="message" value="Recommended remove @alibaba keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@taobao"/>
+        <property name="message" value="Recommended remove @taobao keyword!"/>
+    </module>
+    <module name="RegexpSingleline">
+        <property name="format" value="@author"/>
+        <property name="message" value="Recommended remove @author tag in javadoc!"/>
+    </module>
+
+    <module name="RegexpSingleline">
+        <property name="format"
+                  value=".*[\u3400-\u4DB5\u4E00-\u9FA5\u9FA6-\u9FBB\uF900-\uFA2D\uFA30-\uFA6A\uFA70-\uFAD9\uFF00-\uFFEF\u2E80-\u2EFF\u3000-\u303F\u31C0-\u31EF]+.*"/>
+        <property name="message" value="Not allow chinese character !"/>
+    </module>
+
+    <module name="FileLength">
+        <property name="max" value="3000"/>
+    </module>
+
+    <module name="TreeWalker">
+
+        <module name="UnusedImports">
+            <property name="processJavadoc" value="true"/>
+        </module>
+        <module name="RedundantImport"/>
+
+        <!--<module name="IllegalImport" />-->
+
+        <!--Checks that classes that override equals() also override hashCode()-->
+        <module name="EqualsHashCode"/>
+        <!--Checks for over-complicated boolean expressions. Currently finds code like if (topic == true), topic || true, !false, etc.-->
+        <module name="SimplifyBooleanExpression"/>
+        <module name="OneStatementPerLine"/>
+        <module name="UnnecessaryParentheses"/>
+        <!--Checks for over-complicated boolean return statements. For example the following code-->
+        <module name="SimplifyBooleanReturn"/>
+
+        <!--Check that the default is after all the cases in producerGroup switch statement-->
+        <module name="DefaultComesLast"/>
+        <!--Detects empty statements (standalone ";" semicolon)-->
+        <module name="EmptyStatement"/>
+        <!--Checks that long constants are defined with an upper ell-->
+        <module name="UpperEll"/>
+        <module name="ConstantName">
+            <property name="format" value="(^[A-Z][A-Z0-9]*(_[A-Z0-9]+)*$)|(^log$)"/>
+        </module>
+        <!--Checks that local, non-final variable names conform to producerGroup format specified by the format property-->
+        <module name="LocalVariableName"/>
+        <!--Validates identifiers for local, final variables, including catch parameters-->
+        <module name="LocalFinalVariableName"/>
+        <!--Validates identifiers for non-static fields-->
+        <module name="MemberName"/>
+        <!--Validates identifiers for class type parameters-->
+        <module name="ClassTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <!--Validates identifiers for method type parameters-->
+        <module name="MethodTypeParameterName">
+            <property name="format" value="^[A-Z0-9]*$"/>
+        </module>
+        <module name="PackageName"/>
+        <module name="ParameterName"/>
+        <module name="StaticVariableName"/>
+        <module name="TypeName"/>
+        <!--Checks that there are no import statements that use the * notation-->
+        <module name="AvoidStarImport"/>
+
+        <!--whitespace-->
+        <module name="GenericWhitespace"/>
+        <module name="NoWhitespaceBefore"/>
+        <module name="WhitespaceAfter"/>
+        <module name="NoWhitespaceAfter"/>
+        <module name="WhitespaceAround">
+            <property name="allowEmptyConstructors" value="true"/>
+            <property name="allowEmptyMethods" value="true"/>
+        </module>
+        <module name="Indentation"/>
+        <module name="MethodParamPad"/>
+        <module name="ParenPad"/>
+        <module name="TypecastParenPad"/>
+    </module>
+</module>
diff --git a/rocketmq-jms/style/rmq_codeStyle.xml b/rocketmq-jms/style/rmq_codeStyle.xml
new file mode 100644
index 0000000..cd95ee6
--- /dev/null
+++ b/rocketmq-jms/style/rmq_codeStyle.xml
@@ -0,0 +1,157 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<code_scheme name="rocketmq">
+    <option name="USE_SAME_INDENTS" value="true"/>
+    <option name="IGNORE_SAME_INDENTS_FOR_LANGUAGES" value="true"/>
+    <option name="OTHER_INDENT_OPTIONS">
+        <value>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+            <option name="USE_TAB_CHARACTER" value="false"/>
+            <option name="SMART_TABS" value="false"/>
+            <option name="LABEL_INDENT_SIZE" value="0"/>
+            <option name="LABEL_INDENT_ABSOLUTE" value="false"/>
+            <option name="USE_RELATIVE_INDENTS" value="false"/>
+        </value>
+    </option>
+    <option name="PREFER_LONGER_NAMES" value="false"/>
+    <option name="CLASS_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="NAMES_COUNT_TO_USE_IMPORT_ON_DEMAND" value="1000"/>
+    <option name="PACKAGES_TO_USE_IMPORT_ON_DEMAND">
+        <value/>
+    </option>
+    <option name="IMPORT_LAYOUT_TABLE">
+        <value>
+            <package name="" withSubpackages="true" static="false"/>
+            <emptyLine/>
+            <package name="" withSubpackages="true" static="true"/>
+        </value>
+    </option>
+    <option name="JD_ALIGN_PARAM_COMMENTS" value="false"/>
+    <option name="JD_ALIGN_EXCEPTION_COMMENTS" value="false"/>
+    <option name="JD_P_AT_EMPTY_LINES" value="false"/>
+    <option name="JD_KEEP_INVALID_TAGS" value="false"/>
+    <option name="JD_DO_NOT_WRAP_ONE_LINE_COMMENTS" value="true"/>
+    <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+    <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+    <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+    <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+    <option name="ELSE_ON_NEW_LINE" value="true"/>
+    <option name="WHILE_ON_NEW_LINE" value="true"/>
+    <option name="CATCH_ON_NEW_LINE" value="true"/>
+    <option name="FINALLY_ON_NEW_LINE" value="true"/>
+    <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+    <option name="ALIGN_MULTILINE_FOR" value="false"/>
+    <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+    <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+    <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+    <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+    <option name="LABELED_STATEMENT_WRAP" value="1"/>
+    <option name="WRAP_COMMENTS" value="true"/>
+    <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+    <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+    <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+    <JavaCodeStyleSettings>
+        <option name="CLASS_NAMES_IN_JAVADOC" value="3"/>
+    </JavaCodeStyleSettings>
+    <XML>
+        <option name="XML_LEGACY_SETTINGS_IMPORTED" value="true"/>
+    </XML>
+    <ADDITIONAL_INDENT_OPTIONS fileType="haml">
+        <option name="INDENT_SIZE" value="2"/>
+    </ADDITIONAL_INDENT_OPTIONS>
+    <codeStyleSettings language="Groovy">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="HOCON">
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="JAVA">
+        <option name="KEEP_CONTROL_STATEMENT_IN_ONE_LINE" value="false"/>
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="SPACE_AFTER_TYPE_CAST" value="false"/>
+        <option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="ARRAY_INITIALIZER_LBRACE_ON_NEXT_LINE" value="true"/>
+        <option name="LABELED_STATEMENT_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="JSON">
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+    </codeStyleSettings>
+    <codeStyleSettings language="Scala">
+        <option name="KEEP_BLANK_LINES_IN_DECLARATIONS" value="1"/>
+        <option name="KEEP_BLANK_LINES_IN_CODE" value="1"/>
+        <option name="KEEP_BLANK_LINES_BEFORE_RBRACE" value="1"/>
+        <option name="ELSE_ON_NEW_LINE" value="true"/>
+        <option name="WHILE_ON_NEW_LINE" value="true"/>
+        <option name="CATCH_ON_NEW_LINE" value="true"/>
+        <option name="FINALLY_ON_NEW_LINE" value="true"/>
+        <option name="ALIGN_MULTILINE_PARAMETERS" value="false"/>
+        <option name="ALIGN_MULTILINE_FOR" value="false"/>
+        <option name="METHOD_PARAMETERS_WRAP" value="1"/>
+        <option name="METHOD_ANNOTATION_WRAP" value="1"/>
+        <option name="CLASS_ANNOTATION_WRAP" value="1"/>
+        <option name="FIELD_ANNOTATION_WRAP" value="1"/>
+        <option name="PARENT_SETTINGS_INSTALLED" value="true"/>
+        <indentOptions>
+            <option name="INDENT_SIZE" value="4"/>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+            <option name="TAB_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+    <codeStyleSettings language="XML">
+        <indentOptions>
+            <option name="CONTINUATION_INDENT_SIZE" value="4"/>
+        </indentOptions>
+    </codeStyleSettings>
+</code_scheme>
\ No newline at end of file