Fix break changes in namespace offload policy and RabbitMQ sink. (#7011)

### Motivation

Fix break changes in namespace offload policy and RabbitMQ sink.
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
index a5a210e..065392d 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQAbstractConfig.java
@@ -75,6 +75,12 @@
     private String password = "guest";
 
     @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The RabbitMQ queue name from which messages should be read from or written to")
+    private String queueName;
+
+    @FieldDoc(
         required = false,
         defaultValue = "0",
         help = "Initially requested maximum channel number. 0 for unlimited")
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
index 3166ba2..38437c7 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSink.java
@@ -23,6 +23,7 @@
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ConnectionFactory;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
@@ -49,6 +50,7 @@
     private Channel rabbitMQChannel;
     private RabbitMQSinkConfig rabbitMQSinkConfig;
     private String exchangeName;
+    private String defaultRoutingKey;
 
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
@@ -63,17 +65,26 @@
         );
 
         exchangeName = rabbitMQSinkConfig.getExchangeName();
+        defaultRoutingKey = rabbitMQSinkConfig.getRoutingKey();
         String exchangeType = rabbitMQSinkConfig.getExchangeType();
 
         rabbitMQChannel = rabbitMQConnection.createChannel();
-        rabbitMQChannel.exchangeDeclare(exchangeName, exchangeType, true);
+        String queueName = rabbitMQSinkConfig.getQueueName();
+        if (StringUtils.isNotEmpty(queueName)) {
+            rabbitMQChannel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true);
+            rabbitMQChannel.queueDeclare(rabbitMQSinkConfig.getQueueName(), true, false, false, null);
+            rabbitMQChannel.queueBind(rabbitMQSinkConfig.getQueueName(), exchangeName, defaultRoutingKey);
+        } else {
+            rabbitMQChannel.exchangeDeclare(exchangeName, exchangeType, true);
+        }
     }
 
     @Override
     public void write(Record<byte[]> record) {
         byte[] value = record.getValue();
         try {
-            rabbitMQChannel.basicPublish(exchangeName, record.getProperties().get("routingKey"), null, value);
+            String routingKey = record.getProperties().get("routingKey");
+            rabbitMQChannel.basicPublish(exchangeName, StringUtils.isEmpty(routingKey) ? defaultRoutingKey : routingKey, null, value);
             record.ack();
         } catch (IOException e) {
             record.fail();
diff --git a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
index a276d4f..7477b98 100644
--- a/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
+++ b/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSinkConfig.java
@@ -45,6 +45,12 @@
     private String exchangeName;
 
     @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "The routing key used for publishing the messages")
+    private String routingKey;
+
+    @FieldDoc(
         required = false,
         defaultValue = "topic",
         help = "The exchange type to publish the messages on")