Some minor improvement of RabbitMQ IO Source. (#3937)

### Motivation

This PR try to add some specific config for RabbitMQ IO Source to support setting `basicQos` and `ack`. 

### Modifications

* Add an abstract `RabbitMQAbstractConfig` and `RabbitMQSourceConfig` inherits from it.
* Add some unit tests.
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 3dac871..5c75068 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -49,11 +49,23 @@
     </dependency>
 
     <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>com.rabbitmq</groupId>
       <artifactId>amqp-client</artifactId>
       <version>${rabbitmq-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-broker</artifactId>
+      <version>6.1.6</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
new file mode 100644
index 0000000..97174d0
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.rabbitmq;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+/**
+ * Configuration object for all RabbitMQ components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class RabbitMQAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The connection name used for connecting to RabbitMQ")
+    private String connectionName;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The RabbitMQ host to connect to")
+    private String host;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "5672",
+        help = "The RabbitMQ port to connect to")
+    private int port = 5672;
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "/",
+        help = "The virtual host used for connecting to RabbitMQ")
+    private String virtualHost = "/";
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "guest",
+        help = "The username used to authenticate to RabbitMQ")
+    private String username = "guest";
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "guest",
+        help = "The password used to authenticate to RabbitMQ")
+    private String password = "guest";
+
+    @FieldDoc(
+        required = true,
+        defaultValue = "",
+        help = "The RabbitMQ queue name from which messages should be read from or written to")
+    private String queueName;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "0",
+        help = "Initially requested maximum channel number. 0 for unlimited")
+    private int requestedChannelMax = 0;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "0",
+        help = "Initially requested maximum frame size, in octets. 0 for unlimited")
+    private int requestedFrameMax = 0;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "60000",
+        help = "Connection TCP establishment timeout in milliseconds. 0 for infinite")
+    private int connectionTimeout = 60000;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "10000",
+        help = "The AMQP0-9-1 protocol handshake timeout in milliseconds")
+    private int handshakeTimeout = 10000;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "60",
+        help = "The requested heartbeat timeout in seconds")
+    private int requestedHeartbeat = 60;
+
+    public void validate() {
+        Preconditions.checkNotNull(host, "host property not set.");
+        Preconditions.checkNotNull(port, "port property not set.");
+        Preconditions.checkNotNull(virtualHost, "virtualHost property not set.");
+        Preconditions.checkNotNull(connectionName, "connectionName property not set.");
+        Preconditions.checkNotNull(queueName, "queueName property not set.");
+    }
+
+    public ConnectionFactory createConnectionFactory() {
+        ConnectionFactory connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost(this.host);
+        connectionFactory.setUsername(this.username);
+        connectionFactory.setPassword(this.password);
+        connectionFactory.setVirtualHost(this.virtualHost);
+        connectionFactory.setRequestedChannelMax(this.requestedChannelMax);
+        connectionFactory.setRequestedFrameMax(this.requestedFrameMax);
+        connectionFactory.setConnectionTimeout(this.connectionTimeout);
+        connectionFactory.setHandshakeTimeout(this.handshakeTimeout);
+        connectionFactory.setRequestedHeartbeat(this.requestedHeartbeat);
+        connectionFactory.setPort(this.port);
+        return connectionFactory;
+    }
+}
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
deleted file mode 100644
index f4fd61f..0000000
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.io.rabbitmq;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import lombok.*;
-import lombok.experimental.Accessors;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
-/**
- * RabbitMQ source connector config.
- */
-@Data
-@Setter
-@Getter
-@EqualsAndHashCode
-@ToString
-@Accessors(chain = true)
-public class RabbitMQConfig implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help = "The connection name used for connecting to RabbitMQ")
-    private String connectionName;
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help = "The AMQ uri used for connecting to RabbitMQ")
-    private String amqUri;
-    @FieldDoc(
-        required = true,
-        defaultValue = "",
-        help = "The RabbitMQ queue name")
-    private String queueName;
-
-    public static RabbitMQConfig load(String yamlFile) throws IOException {
-        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), RabbitMQConfig.class);
-    }
-
-    public static RabbitMQConfig load(Map<String, Object> map) throws IOException {
-        ObjectMapper mapper = new ObjectMapper();
-        return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQConfig.class);
-    }
-}
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index 6bb9613..a261373 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -47,34 +47,36 @@
     name = "rabbitmq",
     type = IOType.SOURCE,
     help = "A simple connector to move messages from a RabbitMQ queue to a Pulsar topic",
-    configClass = RabbitMQConfig.class)
+    configClass = RabbitMQSourceConfig.class)
 public class RabbitMQSource extends PushSource<byte[]> {
 
     private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class);
 
     private Connection rabbitMQConnection;
     private Channel rabbitMQChannel;
-    private RabbitMQConfig rabbitMQConfig;
+    private RabbitMQSourceConfig rabbitMQSourceConfig;
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
-        rabbitMQConfig = RabbitMQConfig.load(config);
-        if (rabbitMQConfig.getAmqUri() == null
-                || rabbitMQConfig.getQueueName() == null) {
-            throw new IllegalArgumentException("Required property not set.");
-        }
-        ConnectionFactory connectionFactory = new ConnectionFactory();
-        connectionFactory.setUri(rabbitMQConfig.getAmqUri());
-        rabbitMQConnection = connectionFactory.newConnection(rabbitMQConfig.getConnectionName());
+        rabbitMQSourceConfig = RabbitMQSourceConfig.load(config);
+        rabbitMQSourceConfig.validate();
+
+        ConnectionFactory connectionFactory = rabbitMQSourceConfig.createConnectionFactory();
+        rabbitMQConnection = connectionFactory.newConnection(rabbitMQSourceConfig.getConnectionName());
         logger.info("A new connection to {}:{} has been opened successfully.",
                 rabbitMQConnection.getAddress().getCanonicalHostName(),
                 rabbitMQConnection.getPort()
         );
         rabbitMQChannel = rabbitMQConnection.createChannel();
-        rabbitMQChannel.queueDeclare(rabbitMQConfig.getQueueName(), false, false, false, null);
+        rabbitMQChannel.queueDeclare(rabbitMQSourceConfig.getQueueName(), false, false, false, null);
+        logger.info("Setting channel.basicQos({}, {}).",
+                rabbitMQSourceConfig.getPrefetchCount(),
+                rabbitMQSourceConfig.isPrefetchGlobal()
+        );
+        rabbitMQChannel.basicQos(rabbitMQSourceConfig.getPrefetchCount(), rabbitMQSourceConfig.isPrefetchGlobal());
         com.rabbitmq.client.Consumer consumer = new RabbitMQConsumer(this, rabbitMQChannel);
-        rabbitMQChannel.basicConsume(rabbitMQConfig.getQueueName(), consumer);
-        logger.info("A consumer for queue {} has been successfully started.", rabbitMQConfig.getQueueName());
+        rabbitMQChannel.basicConsume(rabbitMQSourceConfig.getQueueName(), consumer);
+        logger.info("A consumer for queue {} has been successfully started.", rabbitMQSourceConfig.getQueueName());
     }
 
     @Override
@@ -94,6 +96,10 @@
         @Override
         public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
             source.consume(new RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
+            long deliveryTag = envelope.getDeliveryTag();
+            // positively acknowledge all deliveries up to this delivery tag to reduce network traffic
+            // since manual message acknowledgments are turned on by default
+            this.getChannel().basicAck(deliveryTag, true);
         }
     }
 
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
new file mode 100644
index 0000000..20508c0
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class RabbitMQSourceConfig extends RabbitMQAbstractConfig implements Serializable {
+
+    private static final long serialVersionUID = 1L;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "0",
+        help = "Maximum number of messages that the server will deliver, 0 for unlimited")
+    private int prefetchCount = 0;
+
+    @FieldDoc(
+        required = false,
+        defaultValue = "false",
+        help = "Set true if the settings should be applied to the entire channel rather than each consumer")
+    private boolean prefetchGlobal = false;
+
+    public static RabbitMQSourceConfig load(String yamlFile) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(yamlFile), RabbitMQSourceConfig.class);
+    }
+
+    public static RabbitMQSourceConfig load(Map<String, Object> map) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQSourceConfig.class);
+    }
+
+    @Override
+    public void validate() {
+        super.validate();
+        Preconditions.checkArgument(prefetchCount >= 0, "prefetchCount must be non-negative.");
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
new file mode 100644
index 0000000..cdc8bea
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import org.apache.qpid.server.Broker;
+import org.apache.qpid.server.BrokerOptions;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class RabbitMQBrokerManager {
+
+    private final String PORT = "5672";
+    private final Broker broker = new Broker();
+
+
+    public void startBroker() throws Exception {
+        BrokerOptions brokerOptions = getBrokerOptions();
+        broker.startup(brokerOptions);
+    }
+
+    public void stopBroker() {
+        broker.shutdown();
+    }
+
+    BrokerOptions getBrokerOptions() throws Exception {
+        Path tmpFolder = Files.createTempDirectory("qpidWork");
+        Path homeFolder = Files.createTempDirectory("qpidHome");
+        File etc = new File(homeFolder.toFile(), "etc");
+        etc.mkdir();
+        FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
+        fos.write("guest:guest\n".getBytes());
+        fos.close();
+
+        BrokerOptions brokerOptions = new BrokerOptions();
+
+        brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
+        brokerOptions.setConfigProperty("qpid.amqp_port", PORT);
+        brokerOptions.setConfigProperty("qpid.home_dir", homeFolder.toAbsolutePath().toString());
+        String configPath = getFile("qpid.json").getAbsolutePath();
+        brokerOptions.setInitialConfigurationLocation(configPath);
+
+        return brokerOptions;
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
new file mode 100644
index 0000000..a7e55dd
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.source;
+
+import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * RabbitMQSourceConfig test
+ */
+public class RabbitMQSourceConfigTest {
+    @Test
+    public final void loadFromYamlFileTest() throws IOException {
+        File yamlFile = getFile("sourceConfig.yaml");
+        String path = yamlFile.getAbsolutePath();
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(path);
+        assertNotNull(config);
+        assertEquals("localhost", config.getHost());
+        assertEquals(Integer.parseInt("5672"), config.getPort());
+        assertEquals("/", config.getVirtualHost());
+        assertEquals("guest", config.getUsername());
+        assertEquals("guest", config.getPassword());
+        assertEquals("test-queue", config.getQueueName());
+        assertEquals("test-connection", config.getConnectionName());
+        assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+        assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+        assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+        assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+        assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+        assertEquals(Integer.parseInt("0"), config.getPrefetchCount());
+        assertEquals(Boolean.parseBoolean("false"), config.isPrefetchGlobal());
+    }
+
+    @Test
+    public final void loadFromMapTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("prefetchCount", "0");
+        map.put("prefetchGlobal", "false");
+
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        assertNotNull(config);
+        assertEquals("localhost", config.getHost());
+        assertEquals(Integer.parseInt("5672"), config.getPort());
+        assertEquals("/", config.getVirtualHost());
+        assertEquals("guest", config.getUsername());
+        assertEquals("guest", config.getPassword());
+        assertEquals("test-queue", config.getQueueName());
+        assertEquals("test-connection", config.getConnectionName());
+        assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+        assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+        assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+        assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+        assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+        assertEquals(Integer.parseInt("0"), config.getPrefetchCount());
+        assertEquals(Boolean.parseBoolean("false"), config.isPrefetchGlobal());
+    }
+
+    @Test
+    public final void validValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("prefetchCount", "0");
+        map.put("prefetchGlobal", "false");
+
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = NullPointerException.class,
+        expectedExceptionsMessageRegExp = "host property not set.")
+    public final void missingHostValidateTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("prefetchCount", "0");
+        map.put("prefetchGlobal", "false");
+
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        config.validate();
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class,
+        expectedExceptionsMessageRegExp = "prefetchCount must be non-negative.")
+    public final void invalidPrefetchCountTest() throws IOException {
+        Map<String, Object> map = new HashMap<>();
+        map.put("host", "localhost");
+        map.put("port", "5672");
+        map.put("virtualHost", "/");
+        map.put("username", "guest");
+        map.put("password", "guest");
+        map.put("queueName", "test-queue");
+        map.put("connectionName", "test-connection");
+        map.put("requestedChannelMax", "0");
+        map.put("requestedFrameMax", "0");
+        map.put("connectionTimeout", "60000");
+        map.put("handshakeTimeout", "10000");
+        map.put("requestedHeartbeat", "60");
+        map.put("prefetchCount", "-100");
+        map.put("prefetchGlobal", "false");
+
+        RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+        config.validate();
+    }
+
+    private File getFile(String name) {
+        ClassLoader classLoader = getClass().getClassLoader();
+        return new File(classLoader.getResource(name).getFile());
+    }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
new file mode 100644
index 0000000..11f27cb
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.source;
+
+import org.apache.pulsar.io.rabbitmq.RabbitMQBrokerManager;
+import org.apache.pulsar.io.rabbitmq.RabbitMQSource;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RabbitMQSourceTest {
+
+    private RabbitMQBrokerManager rabbitMQBrokerManager;
+
+    @BeforeMethod
+    public void setUp() throws Exception {
+        rabbitMQBrokerManager = new RabbitMQBrokerManager();
+        rabbitMQBrokerManager.startBroker();
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        rabbitMQBrokerManager.stopBroker();
+    }
+
+    @Test
+    public void TestOpenAndWriteSink() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put("host", "localhost");
+        configs.put("port", "5672");
+        configs.put("virtualHost", "default");
+        configs.put("username", "guest");
+        configs.put("password", "guest");
+        configs.put("queueName", "test-queue");
+        configs.put("connectionName", "test-connection");
+        configs.put("requestedChannelMax", "0");
+        configs.put("requestedFrameMax", "0");
+        configs.put("connectionTimeout", "60000");
+        configs.put("handshakeTimeout", "10000");
+        configs.put("requestedHeartbeat", "60");
+        configs.put("prefetchCount", "0");
+        configs.put("prefetchGlobal", "false");
+
+        RabbitMQSource source = new RabbitMQSource();
+
+        // open should success
+        source.open(configs, null);
+    }
+
+}
diff --git a/pulsar-io/rabbitmq/src/test/resources/qpid.json b/pulsar-io/rabbitmq/src/test/resources/qpid.json
new file mode 100644
index 0000000..6a0381f
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/resources/qpid.json
@@ -0,0 +1,37 @@
+{
+  "name": "EmbeddedBroker",
+  "modelVersion": "2.0",
+  "storeVersion": 1,
+  "authenticationproviders": [
+    {
+      "name": "noPassword",
+      "type": "Anonymous",
+      "secureOnlyMechanisms": []
+    },
+    {
+      "name": "passwordFile",
+      "type": "PlainPasswordFile",
+      "path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
+      "secureOnlyMechanisms": []
+    }
+  ],
+  "ports": [
+    {
+      "name": "AMQP",
+      "port": "${qpid.amqp_port}",
+      "authenticationProvider": "passwordFile",
+      "protocols": [
+        "AMQP_0_9_1"
+      ]
+    }
+  ],
+  "virtualhostnodes": [
+    {
+      "name": "default",
+      "type": "JSON",
+      "defaultVirtualHostNode": "true",
+      "virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
+      "storeType": "DERBY"
+    }
+  ]
+}
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/test/resources/sourceConfig.yaml b/pulsar-io/rabbitmq/src/test/resources/sourceConfig.yaml
new file mode 100644
index 0000000..4ec558f
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/resources/sourceConfig.yaml
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "localhost",
+"port": "5672",
+"virtualHost": "/",
+"username": "guest",
+"password": "guest",
+"queueName": "test-queue",
+"connectionName": "test-connection",
+"requestedChannelMax": "0",
+"requestedFrameMax": "0",
+"connectionTimeout": "60000",
+"handshakeTimeout": "10000",
+"requestedHeartbeat": "60",
+"prefetchCount": "0",
+"prefetchGlobal": "false"
+
+}