[pulsar-kafka] support block-producer on queue-full with sendTimeOut configuration (#6139)

### Motivation
Right now, pulsar-kafka producer block the publish when queue is full if `sendTimeOut > 0`. However, we have multiple users who want to configure `sendTimeOut` but doesn't want to block the thread and need immediate failure.
So, add option to configure `BLOCK_IF_PRODUCER_QUEUE_FULL` and it will not impact existing behavior because if the `BLOCK_IF_PRODUCER_QUEUE_FULL` doesn't exist then it will fallback to existing behavior.
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
index cacca60..13f97b5 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/kafka/clients/producer/PulsarKafkaProducer.java
@@ -168,7 +168,9 @@
         // Kafka blocking semantic when blockOnBufferFull=false is different from Pulsar client
         // Pulsar throws error immediately when the queue is full and blockIfQueueFull=false
         // Kafka, on the other hand, still blocks for "max.block.ms" time and then gives error.
-        boolean shouldBlockPulsarProducer = sendTimeoutMillis > 0;
+        Boolean sendTimeOutConfigured = sendTimeoutMillis > 0;
+        boolean shouldBlockPulsarProducer = Boolean.getBoolean(properties
+                .getProperty(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, sendTimeOutConfigured.toString()));
         pulsarProducerBuilder.blockIfQueueFull(shouldBlockPulsarProducer);
 
         interceptors = (List) producerConfig.getConfiguredInstances(
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
index 5a9a651..7554faf 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/main/java/org/apache/pulsar/client/kafka/compat/PulsarProducerKafkaConfig.java
@@ -22,6 +22,7 @@
 
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException.ProducerQueueIsFullError;
 
 public class PulsarProducerKafkaConfig {
 
@@ -33,6 +34,11 @@
     public static final String MAX_PENDING_MESSAGES_ACROSS_PARTITIONS = "pulsar.producer.max.pending.messages.across.partitions";
     public static final String BATCHING_ENABLED = "pulsar.producer.batching.enabled";
     public static final String BATCHING_MAX_MESSAGES = "pulsar.producer.batching.max.messages";
+    /**
+     * send operations will immediately fail with {@link ProducerQueueIsFullError} when there is no space left in
+     * pending queue.
+     **/
+    public static final String BLOCK_IF_PRODUCER_QUEUE_FULL = "pulsar.block.if.producer.queue.full";
 
     public static ProducerBuilder<byte[]> getProducerBuilder(PulsarClient client, Properties properties) {
         ProducerBuilder<byte[]> producerBuilder = client.newProducer();
diff --git a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
index 21a4f8d..b1069ad 100644
--- a/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
+++ b/pulsar-client-kafka-compat/pulsar-client-kafka/src/test/java/org/apache/kafka/clients/producer/PulsarKafkaProducerTest.java
@@ -116,11 +116,13 @@
         properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Arrays.asList("pulsar://localhost:6650"));
         properties.put(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "1000000");
         properties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "1000000");
+        properties.put(PulsarProducerKafkaConfig.BLOCK_IF_PRODUCER_QUEUE_FULL, Boolean.FALSE.toString());
 
         new PulsarKafkaProducer<>(properties);
 
         verify(mockClientBuilder, times(1)).keepAliveInterval(1000, TimeUnit.SECONDS);
         verify(mockProducerBuilder, times(1)).sendTimeout(1000000, TimeUnit.MILLISECONDS);
+        verify(mockProducerBuilder, times(1)).blockIfQueueFull(false);
     }
 
     @Test