Add a Pulsar IO connector for RabbitMQ sink. (#3967)
### Motivation
Provides a builtin RabbitMQ Sink Connector, in order to persist pulsar messages to a RabbitMQ queue.
### Modifications
Add a RabbitMQ Sink and some unit tests.
### Verifying this change
This change can be verified as follows:
* deploy the RabbitMQ sink connector with configuration file containing the following fields:
```
configs:
host: "localhost"
port: "5672"
virtualHost: "/"
username: "quest"
password: "quest"
queueName: "test_queue"
connectionName: "test_connection"
exchangeName: "test_exchange"
routingKey: "test_routing"
```
* deploy an RabbitMQ cluster and create the above elements
* send messages in the topic with specified schema declared when deploying the connector
* use `rabbitmqadmin` or `rabbitmqctl` to query messages in the specified queue
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 5c75068..008ee06 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -37,6 +37,22 @@
<artifactId>pulsar-io-core</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-instance</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-original</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
new file mode 100644
index 0000000..43a980f
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * A Simple RabbitMQ sink, which transfer records from Pulsar to RabbitMQ.
+ * This class expects records from Pulsar to have values that are stored as bytes or string.
+ */
+@Connector(
+ name = "rabbitmq",
+ type = IOType.SINK,
+ help = "A sink connector is used for moving messages from Pulsar to RabbitMQ.",
+ configClass = RabbitMQSinkConfig.class
+)
+@Slf4j
+public class RabbitMQSink<T> implements Sink<T> {
+
+ private Connection rabbitMQConnection;
+ private Channel rabbitMQChannel;
+ private RabbitMQSinkConfig rabbitMQSinkConfig;
+ private String exchangeName;
+ private String routingKey;
+
+ @Override
+ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+ rabbitMQSinkConfig = RabbitMQSinkConfig.load(config);
+ rabbitMQSinkConfig.validate();
+
+ ConnectionFactory connectionFactory = rabbitMQSinkConfig.createConnectionFactory();
+ rabbitMQConnection = connectionFactory.newConnection(rabbitMQSinkConfig.getConnectionName());
+ log.info("A new connection to {}:{} has been opened successfully.",
+ rabbitMQConnection.getAddress().getCanonicalHostName(),
+ rabbitMQConnection.getPort()
+ );
+
+ exchangeName = rabbitMQSinkConfig.getExchangeName();
+ routingKey = rabbitMQSinkConfig.getRoutingKey();
+
+ rabbitMQChannel = rabbitMQConnection.createChannel();
+
+ // several clients share a queue
+ rabbitMQChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
+ rabbitMQChannel.queueDeclare(rabbitMQSinkConfig.getQueueName(), true, false, false, null);
+ rabbitMQChannel.queueBind(rabbitMQSinkConfig.getQueueName(), exchangeName, routingKey);
+ }
+
+ @Override
+ public void write(Record<T> record) {
+ byte[] value = toBytes(record.getValue());
+ try {
+ rabbitMQChannel.basicPublish(exchangeName, routingKey, null, value);
+ record.ack();
+ } catch (IOException e) {
+ record.fail();
+ log.warn("Failed to publish the message to RabbitMQ ", e);
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (rabbitMQChannel != null) {
+ rabbitMQChannel.close();
+ }
+ if (rabbitMQConnection != null) {
+ rabbitMQConnection.close();
+ }
+ }
+
+ private byte[] toBytes(Object obj) {
+ final byte[] result;
+ if (obj instanceof String) {
+ String s = (String) obj;
+ result = s.getBytes(StandardCharsets.UTF_8);
+ } else if (obj instanceof byte[]) {
+ result = (byte[]) obj;
+ } else {
+ throw new IllegalArgumentException("The value of the record must be String or Bytes.");
+ }
+ return result;
+ }
+}
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
new file mode 100644
index 0000000..4e1c5b1
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The exchange to publish the messages on")
+ private String exchangeName;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The routing key used for publishing the messages")
+ private String routingKey;
+
+ public static RabbitMQSinkConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class);
+ }
+
+ public static RabbitMQSinkConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQSinkConfig.class);
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ Preconditions.checkNotNull(exchangeName, "exchangeName property not set.");
+ Preconditions.checkNotNull(routingKey, "routingKey property not set.");
+ }
+}
diff --git a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
index 8be5100..be65345 100644
--- a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -18,5 +18,6 @@
#
name: rabbitmq
-description: RabbitMQ source connector
+description: RabbitMQ source and sink connector
sourceClass: org.apache.pulsar.io.rabbitmq.RabbitMQSource
+sinkClass: org.apache.pulsar.io.rabbitmq.RabbitMQSink
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
new file mode 100644
index 0000000..1a4988b
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.sink;
+
+import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * RabbitMQSinkConfig test
+ */
+public class RabbitMQSinkConfigTest {
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sinkConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(path);
+ assertNotNull(config);
+ assertEquals("localhost", config.getHost());
+ assertEquals(Integer.parseInt("5672"), config.getPort());
+ assertEquals("/", config.getVirtualHost());
+ assertEquals("guest", config.getUsername());
+ assertEquals("guest", config.getPassword());
+ assertEquals("test-queue", config.getQueueName());
+ assertEquals("test-connection", config.getConnectionName());
+ assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+ assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+ assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+ assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+ assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+ assertEquals("test-exchange", config.getExchangeName());
+ assertEquals("test-key", config.getRoutingKey());
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("username", "guest");
+ map.put("password", "guest");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("exchangeName", "test-exchange");
+ map.put("routingKey", "test-key");
+
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+ assertNotNull(config);
+ assertEquals("localhost", config.getHost());
+ assertEquals(Integer.parseInt("5672"), config.getPort());
+ assertEquals("/", config.getVirtualHost());
+ assertEquals("guest", config.getUsername());
+ assertEquals("guest", config.getPassword());
+ assertEquals("test-queue", config.getQueueName());
+ assertEquals("test-connection", config.getConnectionName());
+ assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+ assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+ assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+ assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+ assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+ assertEquals("test-exchange", config.getExchangeName());
+ assertEquals("test-key", config.getRoutingKey());
+ }
+
+ @Test
+ public final void validValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("username", "guest");
+ map.put("password", "guest");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("exchangeName", "test-exchange");
+ map.put("routingKey", "test-key");
+
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class,
+ expectedExceptionsMessageRegExp = "exchangeName property not set.")
+ public final void missingExchangeValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("username", "guest");
+ map.put("password", "guest");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("routingKey", "test-key");
+
+ RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+ config.validate();
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
new file mode 100644
index 0000000..80675d1
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.sink;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.io.rabbitmq.RabbitMQBrokerManager;
+import org.apache.pulsar.io.rabbitmq.RabbitMQSink;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class RabbitMQSinkTest {
+ private RabbitMQBrokerManager rabbitMQBrokerManager;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ rabbitMQBrokerManager = new RabbitMQBrokerManager();
+ rabbitMQBrokerManager.startBroker();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ rabbitMQBrokerManager.stopBroker();
+ }
+
+ @Test
+ public void TestOpenAndWriteSink() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put("host", "localhost");
+ configs.put("port", "5672");
+ configs.put("virtualHost", "default");
+ configs.put("username", "guest");
+ configs.put("password", "guest");
+ configs.put("queueName", "test-queue");
+ configs.put("connectionName", "test-connection");
+ configs.put("requestedChannelMax", "0");
+ configs.put("requestedFrameMax", "0");
+ configs.put("connectionTimeout", "60000");
+ configs.put("handshakeTimeout", "10000");
+ configs.put("requestedHeartbeat", "60");
+ configs.put("exchangeName", "test-exchange");
+ configs.put("routingKey", "test-key");
+
+ RabbitMQSink sink = new RabbitMQSink();
+
+ // open should success
+ sink.open(configs, null);
+
+ // write should success
+ Record<String> record = build("test-topic", "fakeKey", "fakeValue");
+ sink.write(record);
+
+ sink.close();
+ }
+
+ private Record<String> build(String topic, String key, String value) {
+ // prepare a SinkRecord
+ SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+ @Override
+ public Optional<String> getKey() {
+ return Optional.empty();
+ }
+
+ @Override
+ public String getValue() {
+ return key;
+ }
+
+ @Override
+ public Optional<String> getDestinationTopic() {
+ if (topic != null) {
+ return Optional.of(topic);
+ } else {
+ return Optional.empty();
+ }
+ }
+ }, value);
+ return record;
+ }
+}
diff --git a/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml b/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..a8c46b8
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "localhost",
+"port": "5672",
+"virtualHost": "/",
+"username": "guest",
+"password": "guest",
+"queueName": "test-queue",
+"connectionName": "test-connection",
+"requestedChannelMax": "0",
+"requestedFrameMax": "0",
+"connectionTimeout": "60000",
+"handshakeTimeout": "10000",
+"requestedHeartbeat": "60",
+"exchangeName": "test-exchange",
+"routingKey": "test-key"
+
+}