[EDGENT-447][Connectors] contribute a RabbitMQ connector
diff --git a/connectors/pom.xml b/connectors/pom.xml
index 9190cfa..edc2e5b 100644
--- a/connectors/pom.xml
+++ b/connectors/pom.xml
@@ -49,6 +49,7 @@
<module>websocket-jetty</module>
<module>websocket-misc</module>
<module>websocket-server</module>
+ <module>rabbitmq</module>
</modules>
</project>
diff --git a/connectors/rabbitmq/pom.xml b/connectors/rabbitmq/pom.xml
new file mode 100644
index 0000000..9c67fad
--- /dev/null
+++ b/connectors/rabbitmq/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>edgent-connectors</artifactId>
+ <groupId>org.apache.edgent</groupId>
+ <version>1.3.0-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>edgent-connectors-rabbitmq</artifactId>
+
+ <name>Apache Edgent (Java 8): Connectors: RabbitMQ</name>
+
+ <properties>
+ <remote-resources-maven-plugin.remote-resources.dir>../../src/main/ibm-remote-resources</remote-resources-maven-plugin.remote-resources.dir>
+ <edgent.version></edgent.version>
+ <rabbitmq.version>5.1.2</rabbitmq.version>
+ </properties>
+
+ <dependencies>
+ <!-- edgent start -->
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-api-topology</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ </dependency>
+ <!-- edgent end -->
+
+ <!-- edgent test start -->
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-providers-direct</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-providers-direct</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-connectors-common</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.edgent</groupId>
+ <artifactId>edgent-api-topology</artifactId>
+ <version>1.3.0-SNAPSHOT</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <!-- edgent test end -->
+
+ <!-- rabbitmq start -->
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>${rabbitmq.version}</version>
+ </dependency>
+ <!-- rabbitmq end -->
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
new file mode 100644
index 0000000..f35a8fe
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConfigKeyConstants.java
@@ -0,0 +1,92 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+/**
+ * Defines all RabbitMQ config key constant items.
+ */
+public class RabbitmqConfigKeyConstants {
+
+ /**
+ * config key for connection URI, eg: amqp://userName:password@hostName:portNumber/virtualHost
+ */
+ public static final String RABBITMQ_CONFIG_KEY_URI = "rabbitmq.connection.uri";
+
+ /**
+ * config key for RabbitMQ server host
+ */
+ public static final String RABBITMQ_CONFIG_KEY_HOST = "rabbitmq.connection.host";
+
+ /**
+ * config key for RabbitMQ server port, default port is : 5672
+ */
+ public static final String RABBITMQ_CONFIG_KEY_PORT = "rabbitmq.connection.port";
+
+ /**
+ * config key for virtual host which used to split multi-users
+ */
+ public static final String RABBITMQ_CONFIG_KEY_VIRTUAL_HOST = "rabbitmq.connection.virtualHost";
+
+ /**
+ * config key for authorization (user name)
+ */
+ public static final String RABBITMQ_CONFIG_KEY_AUTH_NAME = "rabbitmq.connection.authUsername";
+
+ /**
+ * config key for authorization (password)
+ */
+ public static final String RABBITMQ_CONFIG_KEY_AUTH_PASSWORD = "rabbitmq.connection.authPassword";
+
+ /**
+ * config key for specifying whether enable auto recovery or not.
+ */
+ public static final String RABBITMQ_CONFIG_KEY_AUTO_RECOVERY = "rabbitmq.connection.autoRecovery";
+
+ /**
+ * config key for connection timeout
+ */
+ public static final String RABBITMQ_CONFIG_KEY_TIMEOUT = "rabbitmq.connection.timeout";
+
+ /**
+ * config key for connection network recovery interval
+ */
+ public static final String RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL = "rabbitmq.connection.networkRecoveryInterval";
+
+ /**
+ * config key for connection requested-heartbeat
+ */
+ public static final String RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT = "rabbitmq.connection.requestedHeartbeat";
+
+ /**
+ * config key for specifying whether enable topology recovery or not.
+ */
+ public static final String RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED = "rabbitmq.connection.topologyRecoveryEnabled";
+
+ /**
+ * config key for connection requested channel max num
+ */
+ public static final String RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX = "rabbitmq.connection.requestedChannelMax";
+
+ /**
+ * config key for connection requested frame max
+ */
+ public static final String RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX = "rabbitmq.connection.requestedFrameMax";
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
new file mode 100644
index 0000000..4c2e881
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqConsumer.java
@@ -0,0 +1,87 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqSubscriber;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqConsumer} is a consumer to consume messages from a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Smaple use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = ...
+ *
+ * RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+ * TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+ *
+ * //...
+ * }
+ * </pre>
+ */
+public class RabbitmqConsumer {
+
+ private final RabbitmqConnector connector;
+ private final Topology topology;
+
+ /**
+ * Create a consumer connector for consuming tuples from a RabbitMQ queue.
+ * <p>
+ * See the RabbitMQ java client document :
+ * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+ * The full config option please see RabbitMQ java client API Reference :
+ * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+ * </p>
+ * @param topology topology to add to
+ * @param config RabbitmqProducer configuration information.
+ */
+ public RabbitmqConsumer(Topology topology, Supplier<Map<String, Object>> config) {
+ this.topology = topology;
+ this.connector = new RabbitmqConnector(config);
+ }
+
+ /**
+ * Subscribe to the specified topics and yield a stream of tuples from the published RabbitMQ records.
+ *
+ * @param toTupleFn A function that yields a tuple from a byte array,
+ * @param queue the specified RabbitMQ queue
+ * @param <T> A function that yields a tuple from a
+ * @return stream of tuples
+ */
+ public <T> TStream<T> subscribe(Function<byte[], T> toTupleFn, String queue) {
+ return topology.events(new RabbitmqSubscriber<>(connector, queue, toTupleFn));
+ }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
new file mode 100644
index 0000000..8437285
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/RabbitmqProducer.java
@@ -0,0 +1,99 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqConnector;
+import org.apache.edgent.connectors.rabbitmq.runtime.RabbitmqPublisher;
+import org.apache.edgent.function.Function;
+import org.apache.edgent.function.Supplier;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+
+import java.util.Map;
+
+/**
+ * {@code RabbitmqProducer} is a producer to produce messages to a RabbitMQ messaging broker
+ * <p>
+ * The connector uses and includes components from the RabbitMQ 3.7.3 release.
+ * It has been successfully tested against 3.7.3.
+ * For more information about RabbitMQ see <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ * </p>
+ * Sample use:
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ * config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+ * String queue = "testQueue";
+ *
+ * Topology t = newTopology("testSimple");
+ * RabbitmqProducer producer = new RabbitmqProducer(t, () -> config);
+ *
+ * //TStream<String> stream = ...
+ *
+ * TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+ * }
+ * </pre>
+ */
+public class RabbitmqProducer {
+
+ private final RabbitmqConnector connector;
+ private final Topology topology;
+
+ /**
+ * Create a producer connector for publishing tuples to a RabbitMQ queue.
+ * <p>
+ * See the RabbitMQ java client document :
+ * <a href="http://www.rabbitmq.com/api-guide.html">http://www.rabbitmq.com/api-guide.html</a>
+ * The full config option please see RabbitMQ java client API Reference :
+ * < a href="https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/ConnectionFactory.html">ConnectionFactory</>
+ * </p>
+ * @param topology topology to add to
+ * @param config RabbitmqProducer configuration information.
+ */
+ public RabbitmqProducer(Topology topology, Supplier<Map<String, Object>> config) {
+ this.topology = topology;
+ this.connector = new RabbitmqConnector(config);
+ }
+
+ /**
+ * Publish the stream of tuples to the specified queue.
+ * @param stream The stream to publish
+ * @param queue The specified queue of RabbitMQ
+ * @param msgFn A function that yields the byte[] records from the tuple
+ * @param <T> Tuple type
+ * @return {@link TSink}
+ */
+ public <T> TSink<T> publish(TStream<T> stream, String queue, Function<T, byte[]> msgFn) {
+ return stream.sink(new RabbitmqPublisher<>(connector, queue, msgFn));
+ }
+
+ /**
+ * Publish the stream of tuples to the specified queue.
+ * @param stream The stream to publish
+ * @param queue The specified queue of RabbitMQ
+ * @param msg The string message to publish
+ * @return
+ */
+ public TSink<String> publish(TStream<String> stream, String queue, String msg) {
+ return publish(stream, queue, (String m) -> msg.getBytes());
+ }
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
new file mode 100644
index 0000000..49dafd2
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/package-info.java
@@ -0,0 +1,28 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/**
+ * RabbitMQ stream connector.
+ * <P>
+ * Stream tuples may be published to RabbitMQ queues
+ * and created by subscribing to queues.
+ * For more information about RabbitMQ see
+ * <a href="http://www.rabbitmq.com/">http://www.rabbitmq.com/</a>
+ */
+package org.apache.edgent.connectors.rabbitmq;
\ No newline at end of file
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
new file mode 100644
index 0000000..b83cfb6
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqConnector.java
@@ -0,0 +1,188 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import org.apache.edgent.function.Supplier;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_NAME;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTH_PASSWORD;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_AUTO_RECOVERY;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TIMEOUT;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_URI;
+import static org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_VIRTUAL_HOST;
+
+/**
+ * A connector to an RabbitMQ server.
+ */
+public class RabbitmqConnector implements AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(RabbitmqConnector.class);
+
+ private final Supplier<Map<String, Object>> configFn;
+ private ConnectionFactory connectionFactory;
+ private Connection connection;
+ private Channel channel;
+ private String id;
+
+ public RabbitmqConnector(Supplier<Map<String, Object>> configFn) {
+ this.configFn = configFn;
+ initConnection();
+ }
+
+ public synchronized Channel channel() {
+ if (channel == null) {
+ if (connection != null) {
+ try {
+ channel = connection.createChannel();
+ } catch (IOException e) {
+ logger.error("IOExcetion occurs when create connection channel {}", e);
+ throw new RuntimeException(e);
+ } catch (Exception e) {
+ logger.error("Unknown Exception : {}", e);
+ }
+ } else {
+ logger.error("Inner statue inconformity : the rabbitmq connection is null.");
+ throw new RuntimeException("Inner statue inconformity : the rabbitmq connection is null.");
+ }
+ }
+
+ return channel;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ if (channel != null) {
+ channel.close();
+ }
+
+ if (connection != null) {
+ connection.close();
+ }
+
+ if (connectionFactory != null) {
+ connectionFactory = null;
+ }
+ }
+
+ public String id() {
+ if (id == null) {
+ // include our short object Id
+ id = "RabbitMQ " + toString().substring(toString().indexOf('@') + 1);
+ }
+ return id;
+ }
+
+ private void initConnection() {
+ try {
+ this.connectionFactory = getConnectionFactory();
+ this.connection = connectionFactory.newConnection();
+ } catch (Exception e) {
+ logger.error("{}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private ConnectionFactory getConnectionFactory() throws Exception {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ Map<String, Object> configMap = configFn.get();
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_URI)) {
+ connectionFactory.setUri(configMap.get(RABBITMQ_CONFIG_KEY_URI).toString());
+ } else {
+ if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_HOST)) {
+ throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_HOST);
+ }
+
+ connectionFactory.setHost(configMap.get(RABBITMQ_CONFIG_KEY_HOST).toString());
+
+ if (!configMap.containsKey(RABBITMQ_CONFIG_KEY_PORT)) {
+ throw new RuntimeException("Missed key : " + RABBITMQ_CONFIG_KEY_PORT);
+ }
+
+ connectionFactory.setPort(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_PORT).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST)) {
+ connectionFactory.setVirtualHost(configMap.get(RABBITMQ_CONFIG_KEY_VIRTUAL_HOST).toString());
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_NAME)) {
+ connectionFactory.setUsername(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_NAME).toString());
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD)) {
+ connectionFactory.setPassword(configMap.get(RABBITMQ_CONFIG_KEY_AUTH_PASSWORD).toString());
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY)) {
+ connectionFactory.setAutomaticRecoveryEnabled(
+ Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_AUTO_RECOVERY).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TIMEOUT)) {
+ connectionFactory.setConnectionTimeout(Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TIMEOUT).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL)) {
+ connectionFactory.setNetworkRecoveryInterval(
+ Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_NETWORK_RECOVERY_INTERVAL).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT)) {
+ connectionFactory.setRequestedHeartbeat(
+ Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_HEARTBEAT).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED)) {
+ connectionFactory.setTopologyRecoveryEnabled(
+ Boolean.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_TOPOLOGY_RECOVERY_ENABLED).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX)) {
+ connectionFactory.setRequestedChannelMax(
+ Integer.parseInt(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_CHANNEL_MAX).toString()));
+ }
+
+ if (configMap.containsKey(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX)) {
+ connectionFactory.setRequestedChannelMax(
+ Integer.valueOf(configMap.get(RABBITMQ_CONFIG_KEY_REQUESTED_FRAME_MAX).toString()));
+ }
+
+ return connectionFactory;
+ }
+
+
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
new file mode 100644
index 0000000..67c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqPublisher.java
@@ -0,0 +1,74 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * a publisher for RabbitMQ connector
+ */
+public class RabbitmqPublisher<T> implements Consumer<T>, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(RabbitmqPublisher.class);
+
+ private final RabbitmqConnector connector;
+ private final Function<T, byte[]> msgFn;
+ private final String queue;
+ private String id;
+
+ public RabbitmqPublisher(RabbitmqConnector connector, String queue, Function<T, byte[]> msgFn) {
+ this.connector = connector;
+ this.queue = queue;
+ this.msgFn = msgFn;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ logger.info("{} is closing.", id());
+ connector.close();
+ logger.info("{} is closed.", id());
+ }
+
+ @Override
+ public void accept(T value) {
+ byte[] msg = msgFn.apply(value);
+ try {
+ connector.channel().basicPublish("", queue, null, msg);
+ } catch (IOException e) {
+ logger.error("publish exception : {}", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String id() {
+ if (id == null) {
+ // include our short object Id
+ id = connector.id() + " PUB " + toString().substring(toString().indexOf('@') + 1);
+ }
+
+ return id;
+ }
+}
diff --git a/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
new file mode 100644
index 0000000..40c2d45
--- /dev/null
+++ b/connectors/rabbitmq/src/main/java/org/apache/edgent/connectors/rabbitmq/runtime/RabbitmqSubscriber.java
@@ -0,0 +1,111 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.connectors.rabbitmq.runtime;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.DefaultConsumer;
+import com.rabbitmq.client.Envelope;
+import org.apache.edgent.function.Consumer;
+import org.apache.edgent.function.Function;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * a subscriber for RabbitMQ connector
+ */
+public class RabbitmqSubscriber<T> implements Consumer<Consumer<T>>, AutoCloseable {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger logger = LoggerFactory.getLogger(RabbitmqSubscriber.class);
+
+ private final RabbitmqConnector connector;
+ private Function<byte[], T> toTupleFn;
+ private Consumer<T> eventSubmitter;
+ private ExecutorService executor;
+ private String queue;
+ private String id;
+
+ public RabbitmqSubscriber(RabbitmqConnector connector, String queue, Function<byte[], T> toTupleFn) {
+ this.connector = connector;
+ this.queue = queue;
+ this.toTupleFn = toTupleFn;
+ }
+
+ @Override
+ public synchronized void close() throws Exception {
+ logger.info("{} is closing.", id());
+ if (executor != null && !executor.isShutdown()) {
+ executor.shutdownNow();
+ }
+
+ connector.close();
+ logger.info("{} is closed", id());
+ }
+
+ @Override
+ public void accept(Consumer<T> eventSubmitter) {
+ this.eventSubmitter = eventSubmitter;
+
+ executor = Executors.newFixedThreadPool(1);
+
+ executor.submit(() -> {
+ boolean autoAck = false;
+ try {
+ connector.channel().basicConsume(queue, autoAck,
+ new DefaultConsumer(connector.channel()) {
+ @Override
+ public void handleDelivery(String consumerTag,
+ Envelope envelope,
+ AMQP.BasicProperties properties,
+ byte[] body)
+ throws IOException {
+ long deliveryTag = envelope.getDeliveryTag();
+
+ acceptCallback(body);
+
+ connector.channel().basicAck(deliveryTag, false);
+ }
+ });
+ } catch (IOException e) {
+ logger.error("Consumer exception : {}", e);
+ }
+ });
+
+ }
+
+ private void acceptCallback(byte[] msg) {
+ T tuple = toTupleFn.apply(msg);
+ eventSubmitter.accept(tuple);
+ }
+
+ public String id() {
+ if (id == null) {
+ // include our short object Id
+ id = connector.id() + " SUB " + toString().substring(toString().indexOf('@') + 1);
+ }
+
+ return id;
+ }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
new file mode 100644
index 0000000..9565bf9
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsGlobalTestManual.java
@@ -0,0 +1,38 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+/**
+ * RabbitmqStreams connector globalization tests.
+ */
+public class RabbitmqStreamsGlobalTestManual extends RabbitmqStreamsTestManual {
+
+ private static final String globalMsg1 = "你好";
+ private static final String globalMsg2 = "你在嗎";
+
+ public String getMsg1() {
+ return globalMsg1;
+ }
+
+ public String getMsg2() {
+ return globalMsg2;
+ }
+
+}
diff --git a/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
new file mode 100644
index 0000000..02ea0bc
--- /dev/null
+++ b/connectors/rabbitmq/src/test/java/org/apache/edgent/test/connectors/rabbitmq/RabbitmqStreamsTestManual.java
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package org.apache.edgent.test.connectors.rabbitmq;
+
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConfigKeyConstants;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqConsumer;
+import org.apache.edgent.connectors.rabbitmq.RabbitmqProducer;
+import org.apache.edgent.test.connectors.common.ConnectorTestBase;
+import org.apache.edgent.topology.TSink;
+import org.apache.edgent.topology.TStream;
+import org.apache.edgent.topology.Topology;
+import org.apache.edgent.topology.plumbing.PlumbingStreams;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertNotNull;
+
+/**
+ * A {@link RabbitmqConsumer} manual test case.
+ * Please follow there steps
+ *
+ * step 1 :
+ * Install RabbitMQ server.
+ * For Mac os x, you can use homebrew to install it, the simple command is : `brew install rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ * <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ *
+ * step 2 :
+ * Start the RabbitMQ server.
+ * For Mac os x, if you install it with homebrew, you can start it with : `brew services start rabbitmq`
+ * For other system, please follow RabbitMQ's offical document :
+ * <a href="http://www.rabbitmq.com/download.html">http://www.rabbitmq.com/download.html</a>
+ * Note : the default port which RabbitMQ server listen is 5672
+ * and it will start a web console which listen port 15672
+ * so you can access it with your web browser, just enter : http://localhost:15672/
+ * you will see a authorization page, the default user name and password are both `guest`
+ *
+ * step 3 :
+ * Create a test queue use the web console.
+ * after starting the server, you can access `http://localhost:15672/#/queues`
+ * then you will find a button named `Add a new queue`, click it and input `testQueue` into `name` field
+ * and click `Add queue` button to create the queue.
+ *
+ * step 4 :
+ * run this test case to test the publish and subscribe method.
+ */
+public class RabbitmqStreamsTestManual extends ConnectorTestBase {
+
+ private static final int PUB_DELAY_MSEC = 15*1000; // have seen 12sec 1st test's consumer startup delay
+ private static final int SEC_TIMEOUT = 20;
+
+ private final String msg1 = "Hello";
+ private final String msg2 = "Are you there?";
+
+ public String getMsg1() {
+ return msg1;
+ }
+
+ public String getMsg2() {
+ return msg2;
+ }
+
+ private Map<String, Object> initRabbitmqConfig() {
+ Map<String, Object> config = new HashMap<>();
+ config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_HOST, "127.0.0.1");
+ config.put(RabbitmqConfigKeyConstants.RABBITMQ_CONFIG_KEY_PORT, 5672);
+
+ return config;
+ }
+
+ @Test
+ public void testSimple() throws Exception {
+ Topology t = newTopology("testSimple");
+ Map<String, Object> configMap = initRabbitmqConfig();
+ MsgGenerator generator = new MsgGenerator(t.getName());
+ String queue = "testQueue";
+
+ List<String> msgs = createMsgs(generator, queue, getMsg1(), getMsg2());
+
+ TStream<String> stream = PlumbingStreams.blockingOneShotDelay(
+ t.collection(msgs), PUB_DELAY_MSEC, TimeUnit.MILLISECONDS);
+
+ RabbitmqConsumer consumer = new RabbitmqConsumer(t, () -> configMap);
+
+ TStream<String> receivedStream = consumer.subscribe((byte[] bytes) -> new String(bytes), queue);
+
+ RabbitmqProducer producer = new RabbitmqProducer(t, () -> configMap);
+
+ TSink<String> sink = producer.publish(stream, queue, (String s) -> s.getBytes());
+
+ completeAndValidate("", t, receivedStream, generator, SEC_TIMEOUT, msgs.toArray(new String[0]));
+
+ assertNotNull(sink);
+ }
+
+
+}