[EDGENT-447][Connectors] contribute a RabbitMQ connector
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 9190cfa..edc2e5b 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -49,6 +49,7 @@
     <module>websocket-jetty</module>
     <module>websocket-misc</module>
     <module>websocket-server</module>
+    <module>rabbitmq</module>
   </modules>
 
 </project>
diff --git a/connectors/rabbitmq/pom.xml b/connectors/rabbitmq/pom.xml
new file mode 100644
index 0000000..9c67fad
--- /dev/null
+++ b/connectors/rabbitmq/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>edgent-connectors</artifactId>
+    <groupId>org.apache.edgent</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>edgent-connectors-rabbitmq</artifactId>
+
+  <name>Apache Edgent (Java 8): Connectors: RabbitMQ</name>
+
+  <properties>
+    <remote-resources-maven-plugin.remote-resources.dir>../../src/main/ibm-remote-resources</remote-resources-maven-plugin.remote-resources.dir>
+    <edgent.version></edgent.version>
+    <rabbitmq.version>5.1.2</rabbitmq.version>
+  </properties>
+
+  <dependencies>
+    <!-- edgent start -->
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-api-topology</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+    </dependency>
+    <!-- edgent end -->
+
+    <!-- edgent test start -->
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-providers-direct</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-providers-direct</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-connectors-common</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-api-topology</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- edgent test end -->
+
+    <!-- rabbitmq start -->
+    <dependency>
+      <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+      <version>${rabbitmq.version}</version>
+    </dependency>
+    <!-- rabbitmq end -->
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
new file mode 100644
index 0000000..f35a8fe
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+/**
+ * Defines all RabbitMQ config key constant items.
+ */
+public class RabbitmqConfigKeyConstants {
+
+    /**
+     * config key for connection URI, eg: amqp://userName:password@hostName:portNumber/virtualHost
+     */
+    public static final String RABBITMQ_CONFIG_KEY_URI = "rabbitmq.connection.uri";
+
+    /**
+     * config key for RabbitMQ server host
+     */
+    public static final String RABBITMQ_CONFIG_KEY_HOST = "rabbitmq.connection.host";
+
+    /**
+     * config key for RabbitMQ server port, default port is : 5672
+     */
+    public static final String RABBITMQ_CONFIG_KEY_PORT = "rabbitmq.connection.port";
+
+    /**
+     * config key for virtual host which used to split multi-users
+     */
+    public static final String RABBITMQ_CONFIG_KEY_VIRTUAL_HOST = "rabbitmq.connection.virtualHost";
+
+    /**
+     * config key for authorization (user name)
+     */
+    public static final String RABBITMQ_CONFIG_KEY_AUTH_NAME = "rabbitmq.connection.authUsername";
+
+    /**
+     * config key for authorization (password)
+     */
+    public static final String RABBITMQ_CONFIG_KEY_AUTH_PASSWORD = "rabbitmq.connection.authPassword";
+
+    /**
+     * config key for specifying whether enable auto recovery or not.
+     */
+    public static final String RABBITMQ_CONFIG_KEY_AUTO_RECOVERY = "rabbitmq.connection.autoRecovery";
+
+    /**
+     * config key for connection timeout
+     */
+    public static final String RABBITMQ_CONFIG_KEY_TIMEOUT = "rabbitmq.connection.timeout";
+
+    /**
+     * config key for connection network recovery interval
+     */
+    public static final String RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL = "rabbitmq.connection.networkRecoveryInterval";
+
+    /**
+     * config key for connection requested-heartbeat
+     */
+    public static final String RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT = "rabbitmq.connection.requestedHeartbeat";
+
+    /**
+     * config key for specifying whether enable topology recovery or not.
+     */
+    public static final String RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.connection.topologyRecoveryEnabled";
+
+    /**
+     * config key for connection requested channel max num
+     */
+    public static final String RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX = "rabbitmq.connection.requestedChannelMax";
+
+    /**
+     * config key for connection requested frame max
+     */
+    public static final String RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX = "rabbitmq.connection.requestedFrameMax";
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
new file mode 100644
index 0000000..4c2e881
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqSubscriber;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqConsumer} is a consumer to consume messages from a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Smaple use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = ...
+ *
+ * RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+ * TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+ *
+ * //...
+ * }
+ * </pre>
+ */
+public class RabbitmqConsumer {
+
+    private final RabbitmqConnector connector;
+    private final Topology topology;
+
+    /**
+     * Create a consumer connector for consuming tuples from a RabbitMQ queue.
+     * <p>
+     * See the RabbitMQ java client document :
+     * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+     * The full config option please see RabbitMQ java client API Reference :
+     * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+     * </p>
+     * @param topology topology to add to
+     * @param config RabbitmqProducer configuration information.
+     */
+    public RabbitmqConsumer(Topology topology, Supplier<Map<String, Object>> config) {
+        this.topology = topology;
+        this.connector = new RabbitmqConnector(config);
+    }
+
+    /**
+     * Subscribe to the specified topics and yield a stream of tuples from the published RabbitMQ records.
+     *
+     * @param toTupleFn A function that yields a tuple from a byte array,
+     * @param queue the specified RabbitMQ queue
+     * @param <T> A function that yields a tuple from a
+     * @return stream of tuples
+     */
+    public <T> TStream<T> subscribe(Function<byte[], T> toTupleFn, String queue) {
+        return topology.events(new RabbitmqSubscriber<>(connector, queue, toTupleFn));
+    }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
new file mode 100644
index 0000000..8437285
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqPublisher;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqProducer} is a producer to produce messages to a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Sample use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = newTopology("testSimple");
+ * RabbitmqProducer producer = new RabbitmqProducer(t, () -> config);
+ *
+ * //TStream<String> stream = ...
+ *
+ * TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+ * }
+ * </pre>
+ */
+public class RabbitmqProducer {
+
+    private final RabbitmqConnector connector;
+    private final Topology topology;
+
+    /**
+     * Create a producer connector for publishing tuples to a RabbitMQ queue.
+     * <p>
+     * See the RabbitMQ java client document :
+     * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+     * The full config option please see RabbitMQ java client API Reference :
+     * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+     * </p>
+     * @param topology topology to add to
+     * @param config RabbitmqProducer configuration information.
+     */
+    public RabbitmqProducer(Topology topology, Supplier<Map<String, Object>> config) {
+        this.topology = topology;
+        this.connector = new RabbitmqConnector(config);
+    }
+
+    /**
+     * Publish the stream of tuples to the specified queue.
+     * @param stream The stream to publish
+     * @param queue The specified queue of RabbitMQ
+     * @param msgFn A function that yields the byte[] records from the tuple
+     * @param <T> Tuple type
+     * @return {@link TSink}
+     */
+    public <T> TSink<T> publish(TStream<T> stream, String queue, Function<T, byte[]> msgFn) {
+        return stream.sink(new RabbitmqPublisher<>(connector, queue, msgFn));
+    }
+
+    /**
+     * Publish the stream of tuples to the specified queue.
+     * @param stream The stream to publish
+     * @param queue The specified queue of RabbitMQ
+     * @param msg The string message to publish
+     * @return
+     */
+    public TSink<String> publish(TStream<String> stream, String queue, String msg) {
+        return publish(stream, queue, (String m) -> msg.getBytes());
+    }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
new file mode 100644
index 0000000..49dafd2
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * RabbitMQ stream connector.
+ * <P>
+ * Stream tuples may be published to RabbitMQ queues
+ * and created by subscribing to queues.
+ * For more information about RabbitMQ see
+ * <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ */
+package org.apache.edgent.connectors.rabbitmq;
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
new file mode 100644
index 0000000..b83cfb6
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.edgent.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_NAME;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_PASSWORD;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTO_RECOVERY;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TIMEOUT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_URI;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_VIRTUAL_HOST;
+
+/**
+ * A connector to an RabbitMQ server.
+ */
+public class RabbitmqConnector implements AutoCloseable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(RabbitmqConnector.class);
+
+    private final Supplier<Map<String, Object>> configFn;
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Channel channel;
+    private String id;
+
+    public RabbitmqConnector(Supplier<Map<String, Object>> configFn) {
+        this.configFn = configFn;
+        initConnection();
+    }
+
+    public synchronized Channel channel() {
+        if (channel == null) {
+            if (connection != null) {
+                try {
+                    channel = connection.createChannel();
+                } catch (IOException e) {
+                    logger.error("IOExcetion occurs when create connection channel {}", e);
+                    throw new RuntimeException(e);
+                } catch (Exception e) {
+                    logger.error("Unknown Exception : {}", e);
+                }
+            } else {
+                logger.error("Inner statue inconformity : the rabbitmq connection is null.");
+                throw new RuntimeException("Inner statue inconformity : the rabbitmq connection is null.");
+            }
+        }
+
+        return channel;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (channel != null) {
+            channel.close();
+        }
+
+        if (connection != null) {
+            connection.close();
+        }
+
+        if (connectionFactory != null) {
+            connectionFactory = null;
+        }
+    }
+
+    public String id() {
+        if (id == null) {
+            // include our short object Id
+            id = "RabbitMQ " + toString().substring(toString().indexOf('@') + 1);
+        }
+        return id;
+    }
+
+    private void initConnection() {
+        try {
+            this.connectionFactory = getConnectionFactory();
+            this.connection = connectionFactory.newConnection();
+        } catch (Exception e) {
+            logger.error("{}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ConnectionFactory getConnectionFactory() throws Exception {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        Map<String, Object> configMap = configFn.get();
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_URI)) {
+            connectionFactory.setUri(configMap.get(RABBITMQ_CONFIG_KEY_URI).toString());
+        } else {
+            if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_HOST)) {
+                throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_HOST);
+            }
+
+            connectionFactory.setHost(configMap.get(RABBITMQ_CONFIG_KEY_HOST).toString());
+
+            if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_PORT)) {
+                throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_PORT);
+            }
+
+            connectionFactory.setPort(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_PORT).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST)) {
+            connectionFactory.setVirtualHost(configMap.get(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST).toString());
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_NAME)) {
+            connectionFactory.setUsername(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_NAME).toString());
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD)) {
+            connectionFactory.setPassword(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD).toString());
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY)) {
+            connectionFactory.setAutomaticRecoveryEnabled(
+                Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TIMEOUT)) {
+            connectionFactory.setConnectionTimeout(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TIMEOUT).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL)) {
+            connectionFactory.setNetworkRecoveryInterval(
+                Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT)) {
+            connectionFactory.setRequestedHeartbeat(
+                Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED)) {
+            connectionFactory.setTopologyRecoveryEnabled(
+                Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX)) {
+            connectionFactory.setRequestedChannelMax(
+                Integer.parseInt(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX)) {
+            connectionFactory.setRequestedChannelMax(
+                Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX).toString()));
+        }
+
+        return connectionFactory;
+    }
+
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
new file mode 100644
index 0000000..67c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * a publisher for RabbitMQ connector
+ */
+public class RabbitmqPublisher<T> implements Consumer<T>, AutoCloseable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(RabbitmqPublisher.class);
+
+    private final RabbitmqConnector connector;
+    private final Function<T, byte[]> msgFn;
+    private final String queue;
+    private String id;
+
+    public RabbitmqPublisher(RabbitmqConnector connector, String queue, Function<T, byte[]> msgFn) {
+        this.connector = connector;
+        this.queue = queue;
+        this.msgFn = msgFn;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        logger.info("{} is closing.", id());
+        connector.close();
+        logger.info("{} is closed.", id());
+    }
+
+    @Override
+    public void accept(T value) {
+        byte[] msg = msgFn.apply(value);
+        try {
+            connector.channel().basicPublish("", queue, null, msg);
+        } catch (IOException e) {
+            logger.error("publish exception : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String id() {
+        if (id == null) {
+            // include our short object Id
+            id = connector.id() + " PUB " + toString().substring(toString().indexOf('@') + 1);
+        }
+
+        return id;
+    }
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
new file mode 100644
index 0000000..40c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * a subscriber for RabbitMQ connector
+ */
+public class RabbitmqSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(RabbitmqSubscriber.class);
+
+    private final RabbitmqConnector connector;
+    private Function<byte[], T> toTupleFn;
+    private Consumer<T> eventSubmitter;
+    private ExecutorService executor;
+    private String queue;
+    private String id;
+
+    public RabbitmqSubscriber(RabbitmqConnector connector, String queue, Function<byte[], T> toTupleFn) {
+        this.connector = connector;
+        this.queue = queue;
+        this.toTupleFn = toTupleFn;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        logger.info("{} is closing.", id());
+        if (executor != null && !executor.isShutdown()) {
+            executor.shutdownNow();
+        }
+
+        connector.close();
+        logger.info("{} is closed", id());
+    }
+
+    @Override
+    public void accept(Consumer<T> eventSubmitter) {
+        this.eventSubmitter = eventSubmitter;
+
+        executor = Executors.newFixedThreadPool(1);
+
+        executor.submit(() -> {
+            boolean autoAck = false;
+            try {
+                connector.channel().basicConsume(queue, autoAck,
+                    new DefaultConsumer(connector.channel()) {
+                        @Override
+                        public void handleDelivery(String consumerTag,
+                                                   Envelope envelope,
+                                                   AMQP.BasicProperties properties,
+                                                   byte[] body)
+                            throws IOException {
+                            long deliveryTag = envelope.getDeliveryTag();
+
+                            acceptCallback(body);
+
+                            connector.channel().basicAck(deliveryTag, false);
+                        }
+                    });
+            } catch (IOException e) {
+                logger.error("Consumer exception : {}", e);
+            }
+        });
+
+    }
+
+    private void acceptCallback(byte[] msg) {
+        T tuple = toTupleFn.apply(msg);
+        eventSubmitter.accept(tuple);
+    }
+
+    public String id() {
+        if (id == null) {
+            // include our short object Id
+            id = connector.id() + " SUB " + toString().substring(toString().indexOf('@') + 1);
+        }
+
+        return id;
+    }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
new file mode 100644
index 0000000..9565bf9
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+/**
+ * RabbitmqStreams connector globalization tests.
+ */
+public class RabbitmqStreamsGlobalTestManual extends RabbitmqStreamsTestManual {
+
+    private static final String globalMsg1 = "你好";
+    private static final String globalMsg2 = "你在嗎";
+
+    public String getMsg1() {
+        return globalMsg1;
+    }
+
+    public String getMsg2() {
+        return globalMsg2;
+    }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
new file mode 100644
index 0000000..02ea0bc
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConsumer;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqProducer;
+import org.apache.edgent.test.connectors.common.ConnectorTestBase;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A {@link RabbitmqConsumer} manual test case.
+ * Please follow there steps
+ *
+ * step 1 :
+ * Install RabbitMQ server.
+ * For Mac os x, you can use homebrew to install it, the simple command is : `brew install rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ *  <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ *
+ * step 2 :
+ * Start the RabbitMQ server.
+ * For Mac os x, if you install it with homebrew, you can start it with : `brew services start rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ *  <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ * Note : the default port which RabbitMQ server listen is 5672
+ *        and it will start a web console which listen port 15672
+ *        so you can access it with your web browser, just enter : http://localhost:15672/
+ *        you will see a authorization page, the default user name and password are both `guest`
+ *
+ * step 3 :
+ * Create a test queue use the web console.
+ * after starting the server, you can access `http://localhost:15672/#/queues`
+ * then you will find a button named `Add a new queue`, click it and input `testQueue` into `name` field
+ * and click `Add queue` button to create the queue.
+ *
+ * step 4 :
+ * run this test case to test the publish and subscribe method.
+ */
+public class RabbitmqStreamsTestManual extends ConnectorTestBase {
+
+    private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st test's consumer startup delay
+    private static final int SEC_TIMEOUT = 20;
+
+    private final String msg1 = "Hello";
+    private final String msg2 = "Are you there?";
+
+    public String getMsg1() {
+        return msg1;
+    }
+
+    public String getMsg2() {
+        return msg2;
+    }
+
+    private Map<String, Object> initRabbitmqConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+        config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+
+        return config;
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+        Topology t = newTopology("testSimple");
+        Map<String, Object> configMap = initRabbitmqConfig();
+        MsgGenerator generator = new MsgGenerator(t.getName());
+        String queue = "testQueue";
+
+        List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
+
+        TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
+            t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+        RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+
+        TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+
+        RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
+
+        TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+
+        completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
+
+        assertNotNull(sink);
+    }
+
+
+}