Merge branch 'EDGENT-448' of https://github.com/yanghua/incubator-edgent into develop
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 9190cfa..edc2e5b 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -49,6 +49,7 @@
     <module>websocket-jetty</module>
     <module>websocket-misc</module>
     <module>websocket-server</module>
+    <module>rabbitmq</module>
   </modules>
 
 </project>
diff --git a/connectors/rabbitmq/pom.xml b/connectors/rabbitmq/pom.xml
new file mode 100644
index 0000000..a648b61
--- /dev/null
+++ b/connectors/rabbitmq/pom.xml
@@ -0,0 +1,88 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>edgent-connectors</artifactId>
+    <groupId>org.apache.edgent</groupId>
+    <version>1.3.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>edgent-connectors-rabbitmq</artifactId>
+
+  <name>Apache Edgent (Java 8): Connectors: RabbitMQ</name>
+
+  <properties>
+    <remote-resources-maven-plugin.remote-resources.dir>../../src/main/ibm-remote-resources</remote-resources-maven-plugin.remote-resources.dir>
+    <edgent.version></edgent.version>
+    <rabbitmq.version>5.1.2</rabbitmq.version>
+  </properties>
+
+  <dependencies>
+    <!-- edgent start -->
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-api-topology</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+    </dependency>
+    <!-- edgent end -->
+
+    <!-- edgent test start -->
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-providers-direct</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-providers-direct</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-connectors-common</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.edgent</groupId>
+      <artifactId>edgent-api-topology</artifactId>
+      <version>1.3.0-SNAPSHOT</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- edgent test end -->
+
+    <!-- rabbitmq start -->
+    <dependency>
+      <groupId>com.rabbitmq</groupId>
+      <artifactId>amqp-client</artifactId>
+      <version>${rabbitmq.version}</version>
+    </dependency>
+    <!-- rabbitmq end -->
+  </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
new file mode 100644
index 0000000..f35a8fe
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+/**
+ * Defines all RabbitMQ config key constant items.
+ */
+public class RabbitmqConfigKeyConstants {
+
+    /**
+     * config key for connection URI, eg: amqp://userName:password@hostName:portNumber/virtualHost
+     */
+    public static final String RABBITMQ_CONFIG_KEY_URI = "rabbitmq.connection.uri";
+
+    /**
+     * config key for RabbitMQ server host
+     */
+    public static final String RABBITMQ_CONFIG_KEY_HOST = "rabbitmq.connection.host";
+
+    /**
+     * config key for RabbitMQ server port, default port is : 5672
+     */
+    public static final String RABBITMQ_CONFIG_KEY_PORT = "rabbitmq.connection.port";
+
+    /**
+     * config key for virtual host which used to split multi-users
+     */
+    public static final String RABBITMQ_CONFIG_KEY_VIRTUAL_HOST = "rabbitmq.connection.virtualHost";
+
+    /**
+     * config key for authorization (user name)
+     */
+    public static final String RABBITMQ_CONFIG_KEY_AUTH_NAME = "rabbitmq.connection.authUsername";
+
+    /**
+     * config key for authorization (password)
+     */
+    public static final String RABBITMQ_CONFIG_KEY_AUTH_PASSWORD = "rabbitmq.connection.authPassword";
+
+    /**
+     * config key for specifying whether enable auto recovery or not.
+     */
+    public static final String RABBITMQ_CONFIG_KEY_AUTO_RECOVERY = "rabbitmq.connection.autoRecovery";
+
+    /**
+     * config key for connection timeout
+     */
+    public static final String RABBITMQ_CONFIG_KEY_TIMEOUT = "rabbitmq.connection.timeout";
+
+    /**
+     * config key for connection network recovery interval
+     */
+    public static final String RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL = "rabbitmq.connection.networkRecoveryInterval";
+
+    /**
+     * config key for connection requested-heartbeat
+     */
+    public static final String RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT = "rabbitmq.connection.requestedHeartbeat";
+
+    /**
+     * config key for specifying whether enable topology recovery or not.
+     */
+    public static final String RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.connection.topologyRecoveryEnabled";
+
+    /**
+     * config key for connection requested channel max num
+     */
+    public static final String RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX = "rabbitmq.connection.requestedChannelMax";
+
+    /**
+     * config key for connection requested frame max
+     */
+    public static final String RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX = "rabbitmq.connection.requestedFrameMax";
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
new file mode 100644
index 0000000..4c2e881
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqSubscriber;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqConsumer} is a consumer to consume messages from a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Smaple use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = ...
+ *
+ * RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+ * TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+ *
+ * //...
+ * }
+ * </pre>
+ */
+public class RabbitmqConsumer {
+
+    private final RabbitmqConnector connector;
+    private final Topology topology;
+
+    /**
+     * Create a consumer connector for consuming tuples from a RabbitMQ queue.
+     * <p>
+     * See the RabbitMQ java client document :
+     * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+     * The full config option please see RabbitMQ java client API Reference :
+     * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+     * </p>
+     * @param topology topology to add to
+     * @param config RabbitmqProducer configuration information.
+     */
+    public RabbitmqConsumer(Topology topology, Supplier<Map<String, Object>> config) {
+        this.topology = topology;
+        this.connector = new RabbitmqConnector(config);
+    }
+
+    /**
+     * Subscribe to the specified topics and yield a stream of tuples from the published RabbitMQ records.
+     *
+     * @param toTupleFn A function that yields a tuple from a byte array,
+     * @param queue the specified RabbitMQ queue
+     * @param <T> A function that yields a tuple from a
+     * @return stream of tuples
+     */
+    public <T> TStream<T> subscribe(Function<byte[], T> toTupleFn, String queue) {
+        return topology.events(new RabbitmqSubscriber<>(connector, queue, toTupleFn));
+    }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
new file mode 100644
index 0000000..8437285
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqPublisher;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqProducer} is a producer to produce messages to a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Sample use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = newTopology("testSimple");
+ * RabbitmqProducer producer = new RabbitmqProducer(t, () -> config);
+ *
+ * //TStream<String> stream = ...
+ *
+ * TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+ * }
+ * </pre>
+ */
+public class RabbitmqProducer {
+
+    private final RabbitmqConnector connector;
+    private final Topology topology;
+
+    /**
+     * Create a producer connector for publishing tuples to a RabbitMQ queue.
+     * <p>
+     * See the RabbitMQ java client document :
+     * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+     * The full config option please see RabbitMQ java client API Reference :
+     * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+     * </p>
+     * @param topology topology to add to
+     * @param config RabbitmqProducer configuration information.
+     */
+    public RabbitmqProducer(Topology topology, Supplier<Map<String, Object>> config) {
+        this.topology = topology;
+        this.connector = new RabbitmqConnector(config);
+    }
+
+    /**
+     * Publish the stream of tuples to the specified queue.
+     * @param stream The stream to publish
+     * @param queue The specified queue of RabbitMQ
+     * @param msgFn A function that yields the byte[] records from the tuple
+     * @param <T> Tuple type
+     * @return {@link TSink}
+     */
+    public <T> TSink<T> publish(TStream<T> stream, String queue, Function<T, byte[]> msgFn) {
+        return stream.sink(new RabbitmqPublisher<>(connector, queue, msgFn));
+    }
+
+    /**
+     * Publish the stream of tuples to the specified queue.
+     * @param stream The stream to publish
+     * @param queue The specified queue of RabbitMQ
+     * @param msg The string message to publish
+     * @return
+     */
+    public TSink<String> publish(TStream<String> stream, String queue, String msg) {
+        return publish(stream, queue, (String m) -> msg.getBytes());
+    }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
new file mode 100644
index 0000000..49dafd2
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * RabbitMQ stream connector.
+ * <P>
+ * Stream tuples may be published to RabbitMQ queues
+ * and created by subscribing to queues.
+ * For more information about RabbitMQ see
+ * <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ */
+package org.apache.edgent.connectors.rabbitmq;
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
new file mode 100644
index 0000000..b83cfb6
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.edgent.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_NAME;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_PASSWORD;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTO_RECOVERY;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TIMEOUT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_URI;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_VIRTUAL_HOST;
+
+/**
+ * A connector to an RabbitMQ server.
+ */
+public class RabbitmqConnector implements AutoCloseable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(RabbitmqConnector.class);
+
+    private final Supplier<Map<String, Object>> configFn;
+    private ConnectionFactory connectionFactory;
+    private Connection connection;
+    private Channel channel;
+    private String id;
+
+    public RabbitmqConnector(Supplier<Map<String, Object>> configFn) {
+        this.configFn = configFn;
+        initConnection();
+    }
+
+    public synchronized Channel channel() {
+        if (channel == null) {
+            if (connection != null) {
+                try {
+                    channel = connection.createChannel();
+                } catch (IOException e) {
+                    logger.error("IOExcetion occurs when create connection channel {}", e);
+                    throw new RuntimeException(e);
+                } catch (Exception e) {
+                    logger.error("Unknown Exception : {}", e);
+                }
+            } else {
+                logger.error("Inner statue inconformity : the rabbitmq connection is null.");
+                throw new RuntimeException("Inner statue inconformity : the rabbitmq connection is null.");
+            }
+        }
+
+        return channel;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        if (channel != null) {
+            channel.close();
+        }
+
+        if (connection != null) {
+            connection.close();
+        }
+
+        if (connectionFactory != null) {
+            connectionFactory = null;
+        }
+    }
+
+    public String id() {
+        if (id == null) {
+            // include our short object Id
+            id = "RabbitMQ " + toString().substring(toString().indexOf('@') + 1);
+        }
+        return id;
+    }
+
+    private void initConnection() {
+        try {
+            this.connectionFactory = getConnectionFactory();
+            this.connection = connectionFactory.newConnection();
+        } catch (Exception e) {
+            logger.error("{}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private ConnectionFactory getConnectionFactory() throws Exception {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        Map<String, Object> configMap = configFn.get();
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_URI)) {
+            connectionFactory.setUri(configMap.get(RABBITMQ_CONFIG_KEY_URI).toString());
+        } else {
+            if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_HOST)) {
+                throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_HOST);
+            }
+
+            connectionFactory.setHost(configMap.get(RABBITMQ_CONFIG_KEY_HOST).toString());
+
+            if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_PORT)) {
+                throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_PORT);
+            }
+
+            connectionFactory.setPort(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_PORT).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST)) {
+            connectionFactory.setVirtualHost(configMap.get(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST).toString());
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_NAME)) {
+            connectionFactory.setUsername(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_NAME).toString());
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD)) {
+            connectionFactory.setPassword(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD).toString());
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY)) {
+            connectionFactory.setAutomaticRecoveryEnabled(
+                Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TIMEOUT)) {
+            connectionFactory.setConnectionTimeout(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TIMEOUT).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL)) {
+            connectionFactory.setNetworkRecoveryInterval(
+                Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT)) {
+            connectionFactory.setRequestedHeartbeat(
+                Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED)) {
+            connectionFactory.setTopologyRecoveryEnabled(
+                Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX)) {
+            connectionFactory.setRequestedChannelMax(
+                Integer.parseInt(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX).toString()));
+        }
+
+        if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX)) {
+            connectionFactory.setRequestedChannelMax(
+                Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX).toString()));
+        }
+
+        return connectionFactory;
+    }
+
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
new file mode 100644
index 0000000..67c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * a publisher for RabbitMQ connector
+ */
+public class RabbitmqPublisher<T> implements Consumer<T>, AutoCloseable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(RabbitmqPublisher.class);
+
+    private final RabbitmqConnector connector;
+    private final Function<T, byte[]> msgFn;
+    private final String queue;
+    private String id;
+
+    public RabbitmqPublisher(RabbitmqConnector connector, String queue, Function<T, byte[]> msgFn) {
+        this.connector = connector;
+        this.queue = queue;
+        this.msgFn = msgFn;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        logger.info("{} is closing.", id());
+        connector.close();
+        logger.info("{} is closed.", id());
+    }
+
+    @Override
+    public void accept(T value) {
+        byte[] msg = msgFn.apply(value);
+        try {
+            connector.channel().basicPublish("", queue, null, msg);
+        } catch (IOException e) {
+            logger.error("publish exception : {}", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    public String id() {
+        if (id == null) {
+            // include our short object Id
+            id = connector.id() + " PUB " + toString().substring(toString().indexOf('@') + 1);
+        }
+
+        return id;
+    }
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
new file mode 100644
index 0000000..40c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * a subscriber for RabbitMQ connector
+ */
+public class RabbitmqSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger logger = LoggerFactory.getLogger(RabbitmqSubscriber.class);
+
+    private final RabbitmqConnector connector;
+    private Function<byte[], T> toTupleFn;
+    private Consumer<T> eventSubmitter;
+    private ExecutorService executor;
+    private String queue;
+    private String id;
+
+    public RabbitmqSubscriber(RabbitmqConnector connector, String queue, Function<byte[], T> toTupleFn) {
+        this.connector = connector;
+        this.queue = queue;
+        this.toTupleFn = toTupleFn;
+    }
+
+    @Override
+    public synchronized void close() throws Exception {
+        logger.info("{} is closing.", id());
+        if (executor != null && !executor.isShutdown()) {
+            executor.shutdownNow();
+        }
+
+        connector.close();
+        logger.info("{} is closed", id());
+    }
+
+    @Override
+    public void accept(Consumer<T> eventSubmitter) {
+        this.eventSubmitter = eventSubmitter;
+
+        executor = Executors.newFixedThreadPool(1);
+
+        executor.submit(() -> {
+            boolean autoAck = false;
+            try {
+                connector.channel().basicConsume(queue, autoAck,
+                    new DefaultConsumer(connector.channel()) {
+                        @Override
+                        public void handleDelivery(String consumerTag,
+                                                   Envelope envelope,
+                                                   AMQP.BasicProperties properties,
+                                                   byte[] body)
+                            throws IOException {
+                            long deliveryTag = envelope.getDeliveryTag();
+
+                            acceptCallback(body);
+
+                            connector.channel().basicAck(deliveryTag, false);
+                        }
+                    });
+            } catch (IOException e) {
+                logger.error("Consumer exception : {}", e);
+            }
+        });
+
+    }
+
+    private void acceptCallback(byte[] msg) {
+        T tuple = toTupleFn.apply(msg);
+        eventSubmitter.accept(tuple);
+    }
+
+    public String id() {
+        if (id == null) {
+            // include our short object Id
+            id = connector.id() + " SUB " + toString().substring(toString().indexOf('@') + 1);
+        }
+
+        return id;
+    }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
new file mode 100644
index 0000000..9565bf9
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+/**
+ * RabbitmqStreams connector globalization tests.
+ */
+public class RabbitmqStreamsGlobalTestManual extends RabbitmqStreamsTestManual {
+
+    private static final String globalMsg1 = "你好";
+    private static final String globalMsg2 = "你在嗎";
+
+    public String getMsg1() {
+        return globalMsg1;
+    }
+
+    public String getMsg2() {
+        return globalMsg2;
+    }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
new file mode 100644
index 0000000..02ea0bc
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConsumer;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqProducer;
+import org.apache.edgent.test.connectors.common.ConnectorTestBase;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A {@link RabbitmqConsumer} manual test case.
+ * Please follow there steps
+ *
+ * step 1 :
+ * Install RabbitMQ server.
+ * For Mac os x, you can use homebrew to install it, the simple command is : `brew install rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ *  <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ *
+ * step 2 :
+ * Start the RabbitMQ server.
+ * For Mac os x, if you install it with homebrew, you can start it with : `brew services start rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ *  <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ * Note : the default port which RabbitMQ server listen is 5672
+ *        and it will start a web console which listen port 15672
+ *        so you can access it with your web browser, just enter : http://localhost:15672/
+ *        you will see a authorization page, the default user name and password are both `guest`
+ *
+ * step 3 :
+ * Create a test queue use the web console.
+ * after starting the server, you can access `http://localhost:15672/#/queues`
+ * then you will find a button named `Add a new queue`, click it and input `testQueue` into `name` field
+ * and click `Add queue` button to create the queue.
+ *
+ * step 4 :
+ * run this test case to test the publish and subscribe method.
+ */
+public class RabbitmqStreamsTestManual extends ConnectorTestBase {
+
+    private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st test's consumer startup delay
+    private static final int SEC_TIMEOUT = 20;
+
+    private final String msg1 = "Hello";
+    private final String msg2 = "Are you there?";
+
+    public String getMsg1() {
+        return msg1;
+    }
+
+    public String getMsg2() {
+        return msg2;
+    }
+
+    private Map<String, Object> initRabbitmqConfig() {
+        Map<String, Object> config = new HashMap<>();
+        config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+        config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+
+        return config;
+    }
+
+    @Test
+    public void testSimple() throws Exception {
+        Topology t = newTopology("testSimple");
+        Map<String, Object> configMap = initRabbitmqConfig();
+        MsgGenerator generator = new MsgGenerator(t.getName());
+        String queue = "testQueue";
+
+        List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
+
+        TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
+            t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+        RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+
+        TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+
+        RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
+
+        TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+
+        completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
+
+        assertNotNull(sink);
+    }
+
+
+}
diff --git a/pom.xml b/pom.xml
index bbe6867..6450e14 100644
--- a/pom.xml
+++ b/pom.xml
@@ -505,11 +505,28 @@
         <version>1.0.0-SNAPSHOT</version>
         <executions>
           <execution>
-            <id>filter-test-jars</id>
+            <id>filter-deploy-artifacts</id>
             <phase>install</phase>
             <goals>
-              <goal>filter-test-jars</goal>
+              <goal>filter-deploy-artifacts</goal>
             </goals>
+            <configuration>
+              <filterRules>
+                <!-- Filter out all test-jars -->
+                <filterRule>
+                  <type>test-jar</type>
+                </filterRule>
+                <!-- Filter out the signatures of all test-jars -->
+                <filterRule>
+                  <type>jar.asc</type>
+                  <classifier>tests</classifier>
+                </filterRule>
+                <!-- Filter out any source release archives -->
+                <filterRule>
+                  <classifier>source-release</classifier>
+                </filterRule>
+              </filterRules>
+            </configuration>
           </execution>
         </executions>
       </plugin>
diff --git a/utils/edgent-deployment-filter-maven-plugin/pom.xml b/utils/edgent-deployment-filter-maven-plugin/pom.xml
index 64bbf7e..312a7c3 100644
--- a/utils/edgent-deployment-filter-maven-plugin/pom.xml
+++ b/utils/edgent-deployment-filter-maven-plugin/pom.xml
@@ -59,6 +59,13 @@
       <artifactId>plexus-utils</artifactId>
       <version>3.0.8</version>
     </dependency>
+
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-library</artifactId>
+      <version>1.3</version>
+    </dependency>
+
     <dependency>
       <groupId>junit</groupId>
       <artifactId>junit</artifactId>
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml
index 26dc3cf..466bd6f 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/pom.xml
@@ -70,11 +70,28 @@
         <version>@project.version@</version>
         <executions>
           <execution>
-            <id>filter-test-jars</id>
+            <id>filter-deploy-artifacts</id>
             <phase>install</phase>
             <goals>
-              <goal>filter-test-jars</goal>
+              <goal>filter-deploy-artifacts</goal>
             </goals>
+            <configuration>
+              <filterRules>
+                <!-- Filter out all test-jars -->
+                <filterRule>
+                  <type>test-jar</type>
+                </filterRule>
+                <!-- Filter out the signatures of all test-jars -->
+                <filterRule>
+                  <type>jar.asc</type>
+                  <classifier>tests</classifier>
+                </filterRule>
+                <!-- Filter out any source release archives -->
+                <filterRule>
+                  <classifier>source-release</classifier>
+                </filterRule>
+              </filterRules>
+            </configuration>
           </execution>
         </executions>
       </plugin>
@@ -116,6 +133,33 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.apache.resources</groupId>
+            <artifactId>apache-source-release-assembly-descriptor</artifactId>
+            <version>1.0.6</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <id>source-release-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
+              <descriptorRefs>
+                <descriptorRef>${sourceReleaseAssemblyDescriptor}</descriptorRef>
+              </descriptorRefs>
+              <tarLongFileMode>posix</tarLongFileMode>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy
index c376a30..58286d4 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/with-plugin/verify.groovy
@@ -21,33 +21,66 @@
 
 def jarFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT.jar")
 def testJarFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-tests.jar")
+def testJarAscFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-tests.jar.asc")
+def sourceReleaseFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-source-release.zip")
+def sourceReleaseAscFile = new File(basedir, "target/with-plugin-1.0-SNAPSHOT-source-release.zip.asc")
 
 // The jar file should exist
 assert jarFile.exists() && jarFile.isFile()
 
-// The test-jar should also exist
+// The test-jar and it's signature should also exist
 assert testJarFile.exists() && testJarFile.isFile()
+assert testJarAscFile.exists() && testJarAscFile.isFile()
 
-// The local repo should contain the test-jar.
+// The source release zip and it's signature should exist
+assert sourceReleaseFile.exists() && sourceReleaseFile.isFile()
+assert sourceReleaseAscFile.exists() && sourceReleaseAscFile.isFile()
+
+// The local repo should contain all files.
 def jarLocalRepo = new File("target/maven-repos/local/org/apache/edgent/plugins/it/with-plugin/1.0-SNAPSHOT")
 assert jarLocalRepo.exists()
 def foundTestJarInLocal = false
+def foundTestJarAscInLocal = false
+def foundSourceReleaseZipInLocal = false
+def foundSourceReleaseZipAscInLocal = false
 jarLocalRepo.eachFileRecurse (FileType.FILES) { file ->
     println file.name
     if(file.name.endsWith("tests.jar")) {
         foundTestJarInLocal = true
     }
+    if(file.name.endsWith("tests.jar.asc")) {
+        foundTestJarAscInLocal = true
+    }
+    if(file.name.endsWith("source-release.zip")) {
+        foundSourceReleaseZipInLocal = true
+    }
+    if(file.name.endsWith("source-release.zip.asc")) {
+        foundSourceReleaseZipAscInLocal = true
+    }
 }
-assert foundTestJarInLocal
+assert foundTestJarInLocal && foundTestJarAscInLocal && foundSourceReleaseZipInLocal && foundSourceReleaseZipAscInLocal
 
-// The remote repo shouldn't contain it.
+// The remote repo shouldn't not contain test-jar and source-release and their corresponding signatures.
 def jarRemoteRepo = new File("target/maven-repos/remote/org/apache/edgent/plugins/it/with-plugin/1.0-SNAPSHOT")
 assert jarRemoteRepo.exists()
 def foundTestJarInRemote = false
+def foundTestJarAscInRemote = false
+def foundSourceReleaseZipInRemote = false
+def foundSourceReleaseZipAscInRemote = false
 jarRemoteRepo.eachFileRecurse (FileType.FILES) { file ->
     println file.name
     if(file.name.endsWith("tests.jar")) {
         foundTestJarInRemote = true
     }
+    if(file.name.endsWith("tests.jar.asc")) {
+        foundTestJarAscInRemote = true
+    }
+    if(file.name.endsWith("source-release.zip")) {
+        foundSourceReleaseZipInRemote = true
+    }
+    if(file.name.endsWith("source-release.zip.asc")) {
+        foundSourceReleaseZipAscInRemote = true
+    }
 }
-assert !foundTestJarInRemote
+assert !foundTestJarInRemote && !foundTestJarAscInRemote && !foundSourceReleaseZipInRemote && !foundSourceReleaseZipAscInRemote
+
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml
index e90f322..002017b 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/pom.xml
@@ -101,6 +101,33 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-assembly-plugin</artifactId>
+        <dependencies>
+          <dependency>
+            <groupId>org.apache.apache.resources</groupId>
+            <artifactId>apache-source-release-assembly-descriptor</artifactId>
+            <version>1.0.6</version>
+          </dependency>
+        </dependencies>
+        <executions>
+          <execution>
+            <id>source-release-assembly</id>
+            <phase>package</phase>
+            <goals>
+              <goal>single</goal>
+            </goals>
+            <configuration>
+              <runOnlyAtExecutionRoot>true</runOnlyAtExecutionRoot>
+              <descriptorRefs>
+                <descriptorRef>${sourceReleaseAssemblyDescriptor}</descriptorRef>
+              </descriptorRefs>
+              <tarLongFileMode>posix</tarLongFileMode>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy
index e294108..ea7d39b 100644
--- a/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy
+++ b/utils/edgent-deployment-filter-maven-plugin/src/it/without-plugin/verify.groovy
@@ -21,33 +21,65 @@
 
 def jarFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT.jar")
 def testJarFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-tests.jar")
+def testJarAscFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-tests.jar.asc")
+def sourceReleaseFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-source-release.zip")
+def sourceReleaseAscFile = new File(basedir, "target/without-plugin-1.0-SNAPSHOT-source-release.zip.asc")
 
 // The jar file should exist
 assert jarFile.exists() && jarFile.isFile()
 
-// The test-jar should also exist
+// The test-jar and it's signature should also exist
 assert testJarFile.exists() && testJarFile.isFile()
+assert testJarAscFile.exists() && testJarAscFile.isFile()
 
-// The local repo should contain the test-jar.
+// The source release zip and it's signature should exist
+assert sourceReleaseFile.exists() && sourceReleaseFile.isFile()
+assert sourceReleaseAscFile.exists() && sourceReleaseAscFile.isFile()
+
+// The local repo should contain all expected files.
 def jarLocalRepo = new File("target/maven-repos/local/org/apache/edgent/plugins/it/without-plugin/1.0-SNAPSHOT")
 assert jarLocalRepo.exists()
 def foundTestJarInLocal = false
+def foundTestJarAscInLocal = false
+def foundSourceReleaseZipInLocal = false
+def foundSourceReleaseZipAscInLocal = false
 jarLocalRepo.eachFileRecurse (FileType.FILES) { file ->
     println file.name
     if(file.name.endsWith("tests.jar")) {
         foundTestJarInLocal = true
     }
+    if(file.name.endsWith("tests.jar.asc")) {
+        foundTestJarAscInLocal = true
+    }
+    if(file.name.endsWith("source-release.zip")) {
+        foundSourceReleaseZipInLocal = true
+    }
+    if(file.name.endsWith("source-release.zip.asc")) {
+        foundSourceReleaseZipAscInLocal = true
+    }
 }
-assert foundTestJarInLocal
+assert foundTestJarInLocal && foundTestJarAscInLocal && foundSourceReleaseZipInLocal && foundSourceReleaseZipAscInLocal
 
-// The remote repo should contain it too.
+// The remote repo should also contain all of them.
 def jarRemoteRepo = new File("target/maven-repos/remote/org/apache/edgent/plugins/it/without-plugin/1.0-SNAPSHOT")
 assert jarRemoteRepo.exists()
 def foundTestJarInRemote = false
+def foundTestJarAscInRemote = false
+def foundSourceReleaseZipInRemote = false
+def foundSourceReleaseZipAscInRemote = false
 jarRemoteRepo.eachFileRecurse (FileType.FILES) { file ->
     println file.name
     if(file.name.endsWith("tests.jar")) {
         foundTestJarInRemote = true
     }
+    if(file.name.endsWith("tests.jar.asc")) {
+        foundTestJarAscInRemote = true
+    }
+    if(file.name.endsWith("source-release.zip")) {
+        foundSourceReleaseZipInRemote = true
+    }
+    if(file.name.endsWith("source-release.zip.asc")) {
+        foundSourceReleaseZipAscInRemote = true
+    }
 }
-assert foundTestJarInRemote
+assert foundTestJarInRemote && foundTestJarAscInLocal && foundSourceReleaseZipInLocal && foundSourceReleaseZipAscInLocal
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterDeployArtifactsMojo.java b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterDeployArtifactsMojo.java
new file mode 100644
index 0000000..13b427e
--- /dev/null
+++ b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterDeployArtifactsMojo.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.plugins.deploymentfilter;
+
+import org.apache.edgent.plugins.deploymentfilter.model.FilterRule;
+import org.apache.maven.artifact.Artifact;
+import org.apache.maven.plugin.AbstractMojo;
+import org.apache.maven.plugin.MojoExecutionException;
+
+import org.apache.maven.plugins.annotations.LifecyclePhase;
+import org.apache.maven.plugins.annotations.Mojo;
+import org.apache.maven.plugins.annotations.Parameter;
+import org.apache.maven.project.MavenProject;
+import org.hamcrest.Matcher;
+import org.hamcrest.beans.HasPropertyWithValue;
+import org.hamcrest.core.AllOf;
+import org.hamcrest.core.AnyOf;
+import org.hamcrest.core.IsEqual;
+import org.sonatype.aether.util.StringUtils;
+
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * Goal which filters all 'test-jar' artifacts from installation and deployment.
+ * The goal is added to the 'install' phase as this way it is executed after the install plugin,
+ * but before the deploy, which is the phase we don't want the artifact to be handled.
+ */
+@Mojo( name = "filter-deploy-artifacts", defaultPhase = LifecyclePhase.INSTALL )
+public class FilterDeployArtifactsMojo
+    extends AbstractMojo
+{
+
+    @Parameter(defaultValue="${project}")
+    private MavenProject project;
+
+    @Parameter
+    private List<FilterRule> filterRules;
+
+    public void execute()
+        throws MojoExecutionException
+    {
+        List<Matcher<? super Artifact>> filter = new LinkedList<Matcher<? super Artifact>>();
+        for (FilterRule filterRule : filterRules) {
+            List<Matcher<? super Artifact>> curFilter = new LinkedList<Matcher<? super Artifact>>();
+            if(!StringUtils.isEmpty(filterRule.getType())) {
+                curFilter.add(HasPropertyWithValue.hasProperty("type",
+                    IsEqual.equalTo(filterRule.getType())));
+            }
+            if(!StringUtils.isEmpty(filterRule.getClassifier())) {
+                curFilter.add(HasPropertyWithValue.hasProperty("classifier",
+                    IsEqual.equalTo(filterRule.getClassifier())));
+            }
+            if(!StringUtils.isEmpty(filterRule.getGroupId())) {
+                curFilter.add(HasPropertyWithValue.hasProperty("groupId",
+                    IsEqual.equalTo(filterRule.getGroupId())));
+            }
+            if(!StringUtils.isEmpty(filterRule.getArtifactId())) {
+                curFilter.add(HasPropertyWithValue.hasProperty("artifactId",
+                    IsEqual.equalTo(filterRule.getArtifactId())));
+            }
+            if(!StringUtils.isEmpty(filterRule.getVersion())) {
+                curFilter.add(HasPropertyWithValue.hasProperty("version",
+                    IsEqual.equalTo(filterRule.getVersion())));
+            }
+            if(!StringUtils.isEmpty(filterRule.getScope())) {
+                curFilter.add(HasPropertyWithValue.hasProperty("scope",
+                    IsEqual.equalTo(filterRule.getScope())));
+            }
+            if(!curFilter.isEmpty()) {
+                filter.add(new AllOf<Artifact>(curFilter));
+            }
+        }
+        AnyOf<Artifact> matcher = new AnyOf<Artifact>(filter);
+
+        // Find all 'test-jar' artifacts.
+        // (This has to be done in separate loops in order to prevent
+        // concurrent modification exceptions.
+        List<Artifact> toBeRemovedArtifacts = new LinkedList<Artifact>();
+        for(Artifact artifact : project.getAttachedArtifacts()) {
+            if(matcher.matches(artifact)) {
+                toBeRemovedArtifacts.add(artifact);
+            }
+        }
+
+        // Remove all of them from the list of attached artifacts.
+        if(!toBeRemovedArtifacts.isEmpty()) {
+            for (Artifact toBeRemovedArtifact : toBeRemovedArtifacts) {
+                getLog().info(" - Excluding artifact " + toBeRemovedArtifact.getArtifactId() +
+                    " from deployment.");
+                project.getAttachedArtifacts().remove(toBeRemovedArtifact);
+            }
+        }
+    }
+}
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterTestJarsMojo.java b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterTestJarsMojo.java
deleted file mode 100644
index 0315f19..0000000
--- a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/FilterTestJarsMojo.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-package org.apache.edgent.plugins.deploymentfilter;
-
-import org.apache.maven.artifact.Artifact;
-import org.apache.maven.plugin.AbstractMojo;
-import org.apache.maven.plugin.MojoExecutionException;
-
-import org.apache.maven.plugins.annotations.LifecyclePhase;
-import org.apache.maven.plugins.annotations.Mojo;
-import org.apache.maven.plugins.annotations.Parameter;
-import org.apache.maven.project.MavenProject;
-
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Goal which filters all 'test-jar' artifacts from installation and deployment.
- * The goal is added to the 'install' phase as this way it is executed after the install plugin,
- * but before the deploy, which is the phase we don't want the artifact to be handled.
- */
-@Mojo( name = "filter-test-jars", defaultPhase = LifecyclePhase.INSTALL )
-public class FilterTestJarsMojo
-    extends AbstractMojo
-{
-
-    @Parameter(defaultValue="${project}")
-    private MavenProject project;
-
-    public void execute()
-        throws MojoExecutionException
-    {
-        // Find all 'test-jar' artifacts.
-        // (This has to be done in separate loops in order to prevent
-        // concurrent modification exceptions.
-        List<Artifact> toBeRemovedArtifacts = new LinkedList<Artifact>();
-        for(Artifact artifact : project.getAttachedArtifacts()) {
-            if("test-jar".equals(artifact.getType()) ||
-                ("jar.asc".equals(artifact.getType()) && "tests".equals(artifact.getClassifier()))) {
-                toBeRemovedArtifacts.add(artifact);
-            }
-        }
-
-        // Remove all of them from the list of attached artifacts.
-        if(!toBeRemovedArtifacts.isEmpty()) {
-            for (Artifact toBeRemovedArtifact : toBeRemovedArtifacts) {
-                getLog().info(" - Excluding test-jar artifact " + toBeRemovedArtifact.getArtifactId() +
-                    " from deployment.");
-                project.getAttachedArtifacts().remove(toBeRemovedArtifact);
-            }
-        }
-    }
-}
diff --git a/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/model/FilterRule.java b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/model/FilterRule.java
new file mode 100644
index 0000000..9f12813
--- /dev/null
+++ b/utils/edgent-deployment-filter-maven-plugin/src/main/java/org/apache/edgent/plugins/deploymentfilter/model/FilterRule.java
@@ -0,0 +1,76 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+package org.apache.edgent.plugins.deploymentfilter.model;
+
+public class FilterRule {
+    private String groupId;
+    private String artifactId;
+    private String version;
+    private String type;
+    private String classifier;
+    private String scope;
+
+    public String getGroupId() {
+        return groupId;
+    }
+
+    public void setGroupId(String groupId) {
+        this.groupId = groupId;
+    }
+
+    public String getArtifactId() {
+        return artifactId;
+    }
+
+    public void setArtifactId(String artifactId) {
+        this.artifactId = artifactId;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    public void setVersion(String version) {
+        this.version = version;
+    }
+
+    public String getType() {
+        return type;
+    }
+
+    public void setType(String type) {
+        this.type = type;
+    }
+
+    public String getClassifier() {
+        return classifier;
+    }
+
+    public void setClassifier(String classifier) {
+        this.classifier = classifier;
+    }
+
+    public String getScope() {
+        return scope;
+    }
+
+    public void setScope(String scope) {
+        this.scope = scope;
+    }
+}