Add a Pulsar IO connector for RabbitMQ sink. (#3967)

### Motivation

Provides a builtin RabbitMQ Sink Connector, in order to persist pulsar messages to a RabbitMQ queue.

### Modifications

Add a RabbitMQ Sink and some unit tests.

### Verifying this change

This change can be verified as follows:

* deploy the RabbitMQ sink connector with configuration file containing the following fields:

```
configs:
    host: "localhost"
    port: "5672"
    virtualHost: "/"
    username: "quest"
    password: "quest"
    queueName: "test_queue"
    connectionName: "test_connection"
    exchangeName: "test_exchange"
    routingKey: "test_routing"
```
* deploy an RabbitMQ cluster and create the above elements
* send messages in the topic with specified schema declared when deploying the connector
* use `rabbitmqadmin` or `rabbitmqctl` to query messages in the specified queue
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 5c75068..008ee06 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -37,6 +37,22 @@
       <artifactId>pulsar-io-core</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-functions-instance</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-client-original</artifactId>
+      <version>${project.version}</version>
+    </dependency>
 
     <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
new file mode 100644
index 0000000..43a980f
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+/**
+ * A Simple RabbitMQ sink, which transfer records from Pulsar to RabbitMQ.
+ * This class expects records from Pulsar to have values that are stored as bytes or string.
+ */
+@Connector(
+    name = "rabbitmq",
+    type = IOType.SINK,
+    help = "A sink connector is used for moving messages from Pulsar to RabbitMQ.",
+    configClass = RabbitMQSinkConfig.class
+)
+@Slf4j
+public class RabbitMQSink<T> implements Sink<T> {
+
+    private Connection rabbitMQConnection;
+    private Channel rabbitMQChannel;
+    private RabbitMQSinkConfig rabbitMQSinkConfig;
+    private String exchangeName;
+    private String routingKey;
+
+    @Override
+    public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
+        rabbitMQSinkConfig = RabbitMQSinkConfig.load(config);
+        rabbitMQSinkConfig.validate();
+
+        ConnectionFactory connectionFactory = rabbitMQSinkConfig.createConnectionFactory();
+        rabbitMQConnection = connectionFactory.newConnection(rabbitMQSinkConfig.getConnectionName());
+        log.info("A new connection to {}:{} has been opened successfully.",
+            rabbitMQConnection.getAddress().getCanonicalHostName(),
+            rabbitMQConnection.getPort()
+        );
+
+        exchangeName = rabbitMQSinkConfig.getExchangeName();
+        routingKey = rabbitMQSinkConfig.getRoutingKey();
+
+        rabbitMQChannel = rabbitMQConnection.createChannel();
+
+        // several clients share a queue
+        rabbitMQChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
+        rabbitMQChannel.queueDeclare(rabbitMQSinkConfig.getQueueName(), true, false, false, null);
+        rabbitMQChannel.queueBind(rabbitMQSinkConfig.getQueueName(), exchangeName, routingKey);
+    }
+
+    @Override
+    public void write(Record<T> record) {
+        byte[] value = toBytes(record.getValue());
+        try {
+            rabbitMQChannel.basicPublish(exchangeName, routingKey, null, value);
+            record.ack();
+        } catch (IOException e) {
+            record.fail();
+            log.warn("Failed to publish the message to RabbitMQ ", e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (rabbitMQChannel != null) {
+            rabbitMQChannel.close();
+        }
+        if (rabbitMQConnection != null) {
+            rabbitMQConnection.close();
+        }
+    }
+
+    private byte[] toBytes(Object obj) {
+        final byte[] result;
+        if (obj instanceof String) {
+            String s = (String) obj;
+            result = s.getBytes(StandardCharsets.UTF_8);
+        } else if (obj instanceof byte[]) {
+            result = (byte[]) obj;
+        } else {
+            throw new IllegalArgumentException("The value of the record must be String or Bytes.");
+        }
+        return result;
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
new file mode 100644
index 0000000..4e1c5b1
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class RabbitMQSinkConfig extends RabbitMQAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The exchange to publish the messages on")
+    private String exchangeName;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The routing key used for publishing the messages")
+    private String routingKey;
+
+    public static RabbitMQSinkConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), RabbitMQSinkConfig.class);
+    }
+
+    public static RabbitMQSinkConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQSinkConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        Preconditions.checkNotNull(exchangeName, "exchangeName property not set.");
+        Preconditions.checkNotNull(routingKey, "routingKey property not set.");
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
index 8be5100..be65345 100644
--- a/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/rabbitmq/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -18,5 +18,6 @@
 #
 
 name: rabbitmq
-description: RabbitMQ source connector
+description: RabbitMQ source and sink connector
 sourceClass: org.apache.pulsar.io.rabbitmq.RabbitMQSource
+sinkClass: org.apache.pulsar.io.rabbitmq.RabbitMQSink
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
new file mode 100644
index 0000000..1a4988b
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkConfigTest.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.sink;
+
+import org.apache.pulsar.io.rabbitmq.RabbitMQSinkConfig;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * RabbitMQSinkConfig test
+ */
+public class RabbitMQSinkConfigTest {
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sinkConfig.yaml");
+        String path = yamlFile.getAbsolutePath();
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(path);
+        assertNotNull(config);
+        assertEquals("localhost", config.getHost());
+        assertEquals(Integer.parseInt("5672"), config.getPort());
+        assertEquals("/", config.getVirtualHost());
+        assertEquals("guest", config.getUsername());
+        assertEquals("guest", config.getPassword());
+        assertEquals("test-queue", config.getQueueName());
+        assertEquals("test-connection", config.getConnectionName());
+        assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+        assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+        assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+        assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+        assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+        assertEquals("test-exchange", config.getExchangeName());
+        assertEquals("test-key", config.getRoutingKey());
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("exchangeName", "test-exchange");
+        map.put("routingKey", "test-key");
+
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        assertNotNull(config);
+        assertEquals("localhost", config.getHost());
+        assertEquals(Integer.parseInt("5672"), config.getPort());
+        assertEquals("/", config.getVirtualHost());
+        assertEquals("guest", config.getUsername());
+        assertEquals("guest", config.getPassword());
+        assertEquals("test-queue", config.getQueueName());
+        assertEquals("test-connection", config.getConnectionName());
+        assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+        assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+        assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+        assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+        assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+        assertEquals("test-exchange", config.getExchangeName());
+        assertEquals("test-key", config.getRoutingKey());
+    }
+
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("exchangeName", "test-exchange");
+        map.put("routingKey", "test-key");
+
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class,
+        expectedExceptionsMessageRegExp = "exchangeName property not set.")
+    public final void missingExchangeValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("routingKey", "test-key");
+
+        RabbitMQSinkConfig config = RabbitMQSinkConfig.load(map);
+        config.validate();
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
new file mode 100644
index 0000000..80675d1
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/sink/RabbitMQSinkTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.sink;
+
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.instance.SinkRecord;
+import org.apache.pulsar.io.rabbitmq.RabbitMQBrokerManager;
+import org.apache.pulsar.io.rabbitmq.RabbitMQSink;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+public class RabbitMQSinkTest {
+    private RabbitMQBrokerManager rabbitMQBrokerManager;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        rabbitMQBrokerManager = new RabbitMQBrokerManager();
+        rabbitMQBrokerManager.startBroker();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        rabbitMQBrokerManager.stopBroker();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("host", "localhost");
+        configs.put("port", "5672");
+        configs.put("virtualHost", "default");
+        configs.put("username", "guest");
+        configs.put("password", "guest");
+        configs.put("queueName", "test-queue");
+        configs.put("connectionName", "test-connection");
+        configs.put("requestedChannelMax", "0");
+        configs.put("requestedFrameMax", "0");
+        configs.put("connectionTimeout", "60000");
+        configs.put("handshakeTimeout", "10000");
+        configs.put("requestedHeartbeat", "60");
+        configs.put("exchangeName", "test-exchange");
+        configs.put("routingKey", "test-key");
+
+        RabbitMQSink sink = new RabbitMQSink();
+
+        // open should success
+        sink.open(configs, null);
+
+        // write should success
+        Record<String> record = build("test-topic", "fakeKey", "fakeValue");
+        sink.write(record);
+
+        sink.close();
+    }
+
+    private Record<String> build(String topic, String key, String value) {
+        // prepare a SinkRecord
+        SinkRecord<String> record = new SinkRecord<>(new Record<String>() {
+            @Override
+            public Optional<String> getKey() {
+                return Optional.empty();
+            }
+
+            @Override
+            public String getValue() {
+                return key;
+            }
+
+            @Override
+            public Optional<String> getDestinationTopic() {
+                if (topic != null) {
+                    return Optional.of(topic);
+                } else {
+                    return Optional.empty();
+                }
+            }
+        }, value);
+        return record;
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml b/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
new file mode 100644
index 0000000..a8c46b8
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/resources/sinkConfig.yaml
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "localhost",
+"port": "5672",
+"virtualHost": "/",
+"username": "guest",
+"password": "guest",
+"queueName": "test-queue",
+"connectionName": "test-connection",
+"requestedChannelMax": "0",
+"requestedFrameMax": "0",
+"connectionTimeout": "60000",
+"handshakeTimeout": "10000",
+"requestedHeartbeat": "60",
+"exchangeName": "test-exchange",
+"routingKey": "test-key"
+
+}