Some minor improvement of RabbitMQ IO Source. (#3937)
### Motivation
This PR try to add some specific config for RabbitMQ IO Source to support setting `basicQos` and `ack`.
### Modifications
* Add an abstract `RabbitMQAbstractConfig` and `RabbitMQSourceConfig` inherits from it.
* Add some unit tests.
diff --git a/pulsar-io/rabbitmq/pom.xml b/pulsar-io/rabbitmq/pom.xml
index 3dac871..5c75068 100644
--- a/pulsar-io/rabbitmq/pom.xml
+++ b/pulsar-io/rabbitmq/pom.xml
@@ -49,11 +49,23 @@
</dependency>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>${rabbitmq-client.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-broker</artifactId>
+ <version>6.1.6</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
new file mode 100644
index 0000000..97174d0
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.rabbitmq;
+
+import com.google.common.base.Preconditions;
+import com.rabbitmq.client.ConnectionFactory;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.Serializable;
+
+/**
+ * Configuration object for all RabbitMQ components.
+ */
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode
+@ToString
+@Accessors(chain = true)
+public class RabbitMQAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The connection name used for connecting to RabbitMQ")
+ private String connectionName;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The RabbitMQ host to connect to")
+ private String host;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "5672",
+ help = "The RabbitMQ port to connect to")
+ private int port = 5672;
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "/",
+ help = "The virtual host used for connecting to RabbitMQ")
+ private String virtualHost = "/";
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "guest",
+ help = "The username used to authenticate to RabbitMQ")
+ private String username = "guest";
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "guest",
+ help = "The password used to authenticate to RabbitMQ")
+ private String password = "guest";
+
+ @FieldDoc(
+ required = true,
+ defaultValue = "",
+ help = "The RabbitMQ queue name from which messages should be read from or written to")
+ private String queueName;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "0",
+ help = "Initially requested maximum channel number. 0 for unlimited")
+ private int requestedChannelMax = 0;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "0",
+ help = "Initially requested maximum frame size, in octets. 0 for unlimited")
+ private int requestedFrameMax = 0;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "60000",
+ help = "Connection TCP establishment timeout in milliseconds. 0 for infinite")
+ private int connectionTimeout = 60000;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "10000",
+ help = "The AMQP0-9-1 protocol handshake timeout in milliseconds")
+ private int handshakeTimeout = 10000;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "60",
+ help = "The requested heartbeat timeout in seconds")
+ private int requestedHeartbeat = 60;
+
+ public void validate() {
+ Preconditions.checkNotNull(host, "host property not set.");
+ Preconditions.checkNotNull(port, "port property not set.");
+ Preconditions.checkNotNull(virtualHost, "virtualHost property not set.");
+ Preconditions.checkNotNull(connectionName, "connectionName property not set.");
+ Preconditions.checkNotNull(queueName, "queueName property not set.");
+ }
+
+ public ConnectionFactory createConnectionFactory() {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+ connectionFactory.setHost(this.host);
+ connectionFactory.setUsername(this.username);
+ connectionFactory.setPassword(this.password);
+ connectionFactory.setVirtualHost(this.virtualHost);
+ connectionFactory.setRequestedChannelMax(this.requestedChannelMax);
+ connectionFactory.setRequestedFrameMax(this.requestedFrameMax);
+ connectionFactory.setConnectionTimeout(this.connectionTimeout);
+ connectionFactory.setHandshakeTimeout(this.handshakeTimeout);
+ connectionFactory.setRequestedHeartbeat(this.requestedHeartbeat);
+ connectionFactory.setPort(this.port);
+ return connectionFactory;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
deleted file mode 100644
index f4fd61f..0000000
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQConfig.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.pulsar.io.rabbitmq;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import lombok.*;
-import lombok.experimental.Accessors;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.pulsar.io.core.annotations.FieldDoc;
-
-/**
- * RabbitMQ source connector config.
- */
-@Data
-@Setter
-@Getter
-@EqualsAndHashCode
-@ToString
-@Accessors(chain = true)
-public class RabbitMQConfig implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- @FieldDoc(
- required = true,
- defaultValue = "",
- help = "The connection name used for connecting to RabbitMQ")
- private String connectionName;
- @FieldDoc(
- required = true,
- defaultValue = "",
- help = "The AMQ uri used for connecting to RabbitMQ")
- private String amqUri;
- @FieldDoc(
- required = true,
- defaultValue = "",
- help = "The RabbitMQ queue name")
- private String queueName;
-
- public static RabbitMQConfig load(String yamlFile) throws IOException {
- ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
- return mapper.readValue(new File(yamlFile), RabbitMQConfig.class);
- }
-
- public static RabbitMQConfig load(Map<String, Object> map) throws IOException {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQConfig.class);
- }
-}
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
index 6bb9613..a261373 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java
@@ -47,34 +47,36 @@
name = "rabbitmq",
type = IOType.SOURCE,
help = "A simple connector to move messages from a RabbitMQ queue to a Pulsar topic",
- configClass = RabbitMQConfig.class)
+ configClass = RabbitMQSourceConfig.class)
public class RabbitMQSource extends PushSource<byte[]> {
private static Logger logger = LoggerFactory.getLogger(RabbitMQSource.class);
private Connection rabbitMQConnection;
private Channel rabbitMQChannel;
- private RabbitMQConfig rabbitMQConfig;
+ private RabbitMQSourceConfig rabbitMQSourceConfig;
@Override
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
- rabbitMQConfig = RabbitMQConfig.load(config);
- if (rabbitMQConfig.getAmqUri() == null
- || rabbitMQConfig.getQueueName() == null) {
- throw new IllegalArgumentException("Required property not set.");
- }
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setUri(rabbitMQConfig.getAmqUri());
- rabbitMQConnection = connectionFactory.newConnection(rabbitMQConfig.getConnectionName());
+ rabbitMQSourceConfig = RabbitMQSourceConfig.load(config);
+ rabbitMQSourceConfig.validate();
+
+ ConnectionFactory connectionFactory = rabbitMQSourceConfig.createConnectionFactory();
+ rabbitMQConnection = connectionFactory.newConnection(rabbitMQSourceConfig.getConnectionName());
logger.info("A new connection to {}:{} has been opened successfully.",
rabbitMQConnection.getAddress().getCanonicalHostName(),
rabbitMQConnection.getPort()
);
rabbitMQChannel = rabbitMQConnection.createChannel();
- rabbitMQChannel.queueDeclare(rabbitMQConfig.getQueueName(), false, false, false, null);
+ rabbitMQChannel.queueDeclare(rabbitMQSourceConfig.getQueueName(), false, false, false, null);
+ logger.info("Setting channel.basicQos({}, {}).",
+ rabbitMQSourceConfig.getPrefetchCount(),
+ rabbitMQSourceConfig.isPrefetchGlobal()
+ );
+ rabbitMQChannel.basicQos(rabbitMQSourceConfig.getPrefetchCount(), rabbitMQSourceConfig.isPrefetchGlobal());
com.rabbitmq.client.Consumer consumer = new RabbitMQConsumer(this, rabbitMQChannel);
- rabbitMQChannel.basicConsume(rabbitMQConfig.getQueueName(), consumer);
- logger.info("A consumer for queue {} has been successfully started.", rabbitMQConfig.getQueueName());
+ rabbitMQChannel.basicConsume(rabbitMQSourceConfig.getQueueName(), consumer);
+ logger.info("A consumer for queue {} has been successfully started.", rabbitMQSourceConfig.getQueueName());
}
@Override
@@ -94,6 +96,10 @@
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
source.consume(new RabbitMQRecord(Optional.ofNullable(envelope.getRoutingKey()), body));
+ long deliveryTag = envelope.getDeliveryTag();
+ // positively acknowledge all deliveries up to this delivery tag to reduce network traffic
+ // since manual message acknowledgments are turned on by default
+ this.getChannel().basicAck(deliveryTag, true);
}
}
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
new file mode 100644
index 0000000..20508c0
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSourceConfig.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.common.base.Preconditions;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.io.core.annotations.FieldDoc;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+
+@Data
+@Setter
+@Getter
+@EqualsAndHashCode(callSuper = false)
+@ToString
+@Accessors(chain = true)
+public class RabbitMQSourceConfig extends RabbitMQAbstractConfig implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "0",
+ help = "Maximum number of messages that the server will deliver, 0 for unlimited")
+ private int prefetchCount = 0;
+
+ @FieldDoc(
+ required = false,
+ defaultValue = "false",
+ help = "Set true if the settings should be applied to the entire channel rather than each consumer")
+ private boolean prefetchGlobal = false;
+
+ public static RabbitMQSourceConfig load(String yamlFile) throws IOException {
+ ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+ return mapper.readValue(new File(yamlFile), RabbitMQSourceConfig.class);
+ }
+
+ public static RabbitMQSourceConfig load(Map<String, Object> map) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.readValue(new ObjectMapper().writeValueAsString(map), RabbitMQSourceConfig.class);
+ }
+
+ @Override
+ public void validate() {
+ super.validate();
+ Preconditions.checkArgument(prefetchCount >= 0, "prefetchCount must be non-negative.");
+ }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
new file mode 100644
index 0000000..cdc8bea
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/RabbitMQBrokerManager.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq;
+
+import org.apache.qpid.server.Broker;
+import org.apache.qpid.server.BrokerOptions;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+public class RabbitMQBrokerManager {
+
+ private final String PORT = "5672";
+ private final Broker broker = new Broker();
+
+
+ public void startBroker() throws Exception {
+ BrokerOptions brokerOptions = getBrokerOptions();
+ broker.startup(brokerOptions);
+ }
+
+ public void stopBroker() {
+ broker.shutdown();
+ }
+
+ BrokerOptions getBrokerOptions() throws Exception {
+ Path tmpFolder = Files.createTempDirectory("qpidWork");
+ Path homeFolder = Files.createTempDirectory("qpidHome");
+ File etc = new File(homeFolder.toFile(), "etc");
+ etc.mkdir();
+ FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
+ fos.write("guest:guest\n".getBytes());
+ fos.close();
+
+ BrokerOptions brokerOptions = new BrokerOptions();
+
+ brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
+ brokerOptions.setConfigProperty("qpid.amqp_port", PORT);
+ brokerOptions.setConfigProperty("qpid.home_dir", homeFolder.toAbsolutePath().toString());
+ String configPath = getFile("qpid.json").getAbsolutePath();
+ brokerOptions.setInitialConfigurationLocation(configPath);
+
+ return brokerOptions;
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
new file mode 100644
index 0000000..a7e55dd
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceConfigTest.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.source;
+
+import org.apache.pulsar.io.rabbitmq.RabbitMQSourceConfig;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+/**
+ * RabbitMQSourceConfig test
+ */
+public class RabbitMQSourceConfigTest {
+ @Test
+ public final void loadFromYamlFileTest() throws IOException {
+ File yamlFile = getFile("sourceConfig.yaml");
+ String path = yamlFile.getAbsolutePath();
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(path);
+ assertNotNull(config);
+ assertEquals("localhost", config.getHost());
+ assertEquals(Integer.parseInt("5672"), config.getPort());
+ assertEquals("/", config.getVirtualHost());
+ assertEquals("guest", config.getUsername());
+ assertEquals("guest", config.getPassword());
+ assertEquals("test-queue", config.getQueueName());
+ assertEquals("test-connection", config.getConnectionName());
+ assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+ assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+ assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+ assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+ assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+ assertEquals(Integer.parseInt("0"), config.getPrefetchCount());
+ assertEquals(Boolean.parseBoolean("false"), config.isPrefetchGlobal());
+ }
+
+ @Test
+ public final void loadFromMapTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("username", "guest");
+ map.put("password", "guest");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("prefetchCount", "0");
+ map.put("prefetchGlobal", "false");
+
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ assertNotNull(config);
+ assertEquals("localhost", config.getHost());
+ assertEquals(Integer.parseInt("5672"), config.getPort());
+ assertEquals("/", config.getVirtualHost());
+ assertEquals("guest", config.getUsername());
+ assertEquals("guest", config.getPassword());
+ assertEquals("test-queue", config.getQueueName());
+ assertEquals("test-connection", config.getConnectionName());
+ assertEquals(Integer.parseInt("0"), config.getRequestedChannelMax());
+ assertEquals(Integer.parseInt("0"), config.getRequestedFrameMax());
+ assertEquals(Integer.parseInt("60000"), config.getConnectionTimeout());
+ assertEquals(Integer.parseInt("10000"), config.getHandshakeTimeout());
+ assertEquals(Integer.parseInt("60"), config.getRequestedHeartbeat());
+ assertEquals(Integer.parseInt("0"), config.getPrefetchCount());
+ assertEquals(Boolean.parseBoolean("false"), config.isPrefetchGlobal());
+ }
+
+ @Test
+ public final void validValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("username", "guest");
+ map.put("password", "guest");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("prefetchCount", "0");
+ map.put("prefetchGlobal", "false");
+
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = NullPointerException.class,
+ expectedExceptionsMessageRegExp = "host property not set.")
+ public final void missingHostValidateTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("username", "guest");
+ map.put("password", "guest");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("prefetchCount", "0");
+ map.put("prefetchGlobal", "false");
+
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ config.validate();
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = "prefetchCount must be non-negative.")
+ public final void invalidPrefetchCountTest() throws IOException {
+ Map<String, Object> map = new HashMap<>();
+ map.put("host", "localhost");
+ map.put("port", "5672");
+ map.put("virtualHost", "/");
+ map.put("username", "guest");
+ map.put("password", "guest");
+ map.put("queueName", "test-queue");
+ map.put("connectionName", "test-connection");
+ map.put("requestedChannelMax", "0");
+ map.put("requestedFrameMax", "0");
+ map.put("connectionTimeout", "60000");
+ map.put("handshakeTimeout", "10000");
+ map.put("requestedHeartbeat", "60");
+ map.put("prefetchCount", "-100");
+ map.put("prefetchGlobal", "false");
+
+ RabbitMQSourceConfig config = RabbitMQSourceConfig.load(map);
+ config.validate();
+ }
+
+ private File getFile(String name) {
+ ClassLoader classLoader = getClass().getClassLoader();
+ return new File(classLoader.getResource(name).getFile());
+ }
+}
diff --git a/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
new file mode 100644
index 0000000..11f27cb
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/java/org/apache/pulsar/io/rabbitmq/source/RabbitMQSourceTest.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.rabbitmq.source;
+
+import org.apache.pulsar.io.rabbitmq.RabbitMQBrokerManager;
+import org.apache.pulsar.io.rabbitmq.RabbitMQSource;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class RabbitMQSourceTest {
+
+ private RabbitMQBrokerManager rabbitMQBrokerManager;
+
+ @BeforeMethod
+ public void setUp() throws Exception {
+ rabbitMQBrokerManager = new RabbitMQBrokerManager();
+ rabbitMQBrokerManager.startBroker();
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ rabbitMQBrokerManager.stopBroker();
+ }
+
+ @Test
+ public void TestOpenAndWriteSink() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put("host", "localhost");
+ configs.put("port", "5672");
+ configs.put("virtualHost", "default");
+ configs.put("username", "guest");
+ configs.put("password", "guest");
+ configs.put("queueName", "test-queue");
+ configs.put("connectionName", "test-connection");
+ configs.put("requestedChannelMax", "0");
+ configs.put("requestedFrameMax", "0");
+ configs.put("connectionTimeout", "60000");
+ configs.put("handshakeTimeout", "10000");
+ configs.put("requestedHeartbeat", "60");
+ configs.put("prefetchCount", "0");
+ configs.put("prefetchGlobal", "false");
+
+ RabbitMQSource source = new RabbitMQSource();
+
+ // open should success
+ source.open(configs, null);
+ }
+
+}
diff --git a/pulsar-io/rabbitmq/src/test/resources/qpid.json b/pulsar-io/rabbitmq/src/test/resources/qpid.json
new file mode 100644
index 0000000..6a0381f
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/resources/qpid.json
@@ -0,0 +1,37 @@
+{
+ "name": "EmbeddedBroker",
+ "modelVersion": "2.0",
+ "storeVersion": 1,
+ "authenticationproviders": [
+ {
+ "name": "noPassword",
+ "type": "Anonymous",
+ "secureOnlyMechanisms": []
+ },
+ {
+ "name": "passwordFile",
+ "type": "PlainPasswordFile",
+ "path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
+ "secureOnlyMechanisms": []
+ }
+ ],
+ "ports": [
+ {
+ "name": "AMQP",
+ "port": "${qpid.amqp_port}",
+ "authenticationProvider": "passwordFile",
+ "protocols": [
+ "AMQP_0_9_1"
+ ]
+ }
+ ],
+ "virtualhostnodes": [
+ {
+ "name": "default",
+ "type": "JSON",
+ "defaultVirtualHostNode": "true",
+ "virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
+ "storeType": "DERBY"
+ }
+ ]
+}
\ No newline at end of file
diff --git a/pulsar-io/rabbitmq/src/test/resources/sourceConfig.yaml b/pulsar-io/rabbitmq/src/test/resources/sourceConfig.yaml
new file mode 100644
index 0000000..4ec558f
--- /dev/null
+++ b/pulsar-io/rabbitmq/src/test/resources/sourceConfig.yaml
@@ -0,0 +1,36 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+{
+"host": "localhost",
+"port": "5672",
+"virtualHost": "/",
+"username": "guest",
+"password": "guest",
+"queueName": "test-queue",
+"connectionName": "test-connection",
+"requestedChannelMax": "0",
+"requestedFrameMax": "0",
+"connectionTimeout": "60000",
+"handshakeTimeout": "10000",
+"requestedHeartbeat": "60",
+"prefetchCount": "0",
+"prefetchGlobal": "false"
+
+}