[FLINK-28820][Connector/Pulsar] Improve the writing performance for PulsarSink (#57)

Signed-off-by: tison <wander4096@gmail.com>
diff --git a/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html b/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
index c1ff77e..cd7f803 100644
--- a/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
+++ b/docs/layouts/shortcodes/generated/pulsar_sink_configuration.html
@@ -21,12 +21,6 @@
             <td>If you enable this option and use PulsarSerializationSchema.pulsarSchema(), we would consume and deserialize the message by using Pulsar's <code class="highlighter-rouge">Schema</code>.</td>
         </tr>
         <tr>
-            <td><h5>pulsar.sink.maxPendingMessages</h5></td>
-            <td style="word-wrap: break-word;">1000</td>
-            <td>Integer</td>
-            <td>The maximum number of pending messages in one sink parallelism.</td>
-        </tr>
-        <tr>
             <td><h5>pulsar.sink.maxRecommitTimes</h5></td>
             <td style="word-wrap: break-word;">5</td>
             <td>Integer</td>
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
index dafb8aa..0433bb0 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/PulsarSinkOptions.java
@@ -127,6 +127,8 @@
                             "The allowed transaction recommit times if we meet some retryable exception."
                                     + " This is used in Pulsar Transaction.");
 
+    /** @deprecated This config option was removed for better performance. */
+    @Deprecated
     public static final ConfigOption<Integer> PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM =
             ConfigOptions.key(SINK_CONFIG_PREFIX + "maxPendingMessages")
                     .intType()
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
index 5f935a4..61cfd5a 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/PulsarSinkConfigUtils.java
@@ -84,10 +84,6 @@
                 PULSAR_SEND_TIMEOUT_MS,
                 Math::toIntExact,
                 ms -> builder.sendTimeout(ms, MILLISECONDS));
-        configuration.useOption(PULSAR_MAX_PENDING_MESSAGES, builder::maxPendingMessages);
-        configuration.useOption(
-                PULSAR_MAX_PENDING_MESSAGES_ACROSS_PARTITIONS,
-                builder::maxPendingMessagesAcrossPartitions);
         configuration.useOption(
                 PULSAR_BATCHING_MAX_PUBLISH_DELAY_MICROS,
                 s -> builder.batchingMaxPublishDelay(s, MICROSECONDS));
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
index 768b730..d6a6ee3 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/config/SinkConfiguration.java
@@ -32,7 +32,6 @@
 import java.util.Objects;
 
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_BATCHING_MAX_MESSAGES;
-import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MAX_RECOMMIT_TIMES;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_MESSAGE_KEY_HASH;
 import static org.apache.flink.connector.pulsar.sink.PulsarSinkOptions.PULSAR_TOPIC_METADATA_REFRESH_INTERVAL;
@@ -51,7 +50,6 @@
     private final int partitionSwitchSize;
     private final MessageKeyHash messageKeyHash;
     private final boolean enableSchemaEvolution;
-    private final int maxPendingMessages;
     private final int maxRecommitTimes;
 
     public SinkConfiguration(Configuration configuration) {
@@ -63,7 +61,6 @@
         this.partitionSwitchSize = getInteger(PULSAR_BATCHING_MAX_MESSAGES);
         this.messageKeyHash = get(PULSAR_MESSAGE_KEY_HASH);
         this.enableSchemaEvolution = get(PULSAR_WRITE_SCHEMA_EVOLUTION);
-        this.maxPendingMessages = get(PULSAR_MAX_PENDING_MESSAGES_ON_PARALLELISM);
         this.maxRecommitTimes = get(PULSAR_MAX_RECOMMIT_TIMES);
     }
 
@@ -111,14 +108,6 @@
         return enableSchemaEvolution;
     }
 
-    /**
-     * Pulsar message is sent asynchronously. Set this option for limiting the pending messages in a
-     * Pulsar writer instance.
-     */
-    public int getMaxPendingMessages() {
-        return maxPendingMessages;
-    }
-
     /** The maximum allowed recommitting time for a Pulsar transaction. */
     public int getMaxRecommitTimes() {
         return maxRecommitTimes;
@@ -141,7 +130,6 @@
                 && partitionSwitchSize == that.partitionSwitchSize
                 && enableSchemaEvolution == that.enableSchemaEvolution
                 && messageKeyHash == that.messageKeyHash
-                && maxPendingMessages == that.maxPendingMessages
                 && maxRecommitTimes == that.maxRecommitTimes;
     }
 
@@ -154,7 +142,6 @@
                 partitionSwitchSize,
                 messageKeyHash,
                 enableSchemaEvolution,
-                maxPendingMessages,
                 maxRecommitTimes);
     }
 }
diff --git a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
index 927e40c..43c5fed 100644
--- a/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
+++ b/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/PulsarWriter.java
@@ -48,7 +48,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static java.util.Collections.emptyList;
 import static org.apache.flink.util.IOUtils.closeAll;
@@ -64,7 +64,6 @@
 public class PulsarWriter<IN> implements PrecommittingSinkWriter<IN, PulsarCommittable> {
     private static final Logger LOG = LoggerFactory.getLogger(PulsarWriter.class);
 
-    private final SinkConfiguration sinkConfiguration;
     private final PulsarSerializationSchema<IN> serializationSchema;
     private final TopicMetadataListener metadataListener;
     private final TopicRouter<IN> topicRouter;
@@ -74,7 +73,7 @@
     private final MailboxExecutor mailboxExecutor;
     private final TopicProducerRegister producerRegister;
 
-    private long pendingMessages = 0;
+    private final AtomicLong pendingMessages = new AtomicLong(0);
 
     /**
      * Constructor creating a Pulsar writer.
@@ -96,7 +95,7 @@
             TopicRouter<IN> topicRouter,
             MessageDelayer<IN> messageDelayer,
             InitContext initContext) {
-        this.sinkConfiguration = checkNotNull(sinkConfiguration);
+        checkNotNull(sinkConfiguration);
         this.serializationSchema = checkNotNull(serializationSchema);
         this.metadataListener = checkNotNull(metadataListener);
         this.topicRouter = checkNotNull(topicRouter);
@@ -105,7 +104,6 @@
 
         this.deliveryGuarantee = sinkConfiguration.getDeliveryGuarantee();
         this.sinkContext = new PulsarSinkContextImpl(initContext, sinkConfiguration);
-        this.mailboxExecutor = initContext.getMailboxExecutor();
 
         // Initialize topic metadata listener.
         LOG.debug("Initialize topic metadata after creating Pulsar writer.");
@@ -126,6 +124,7 @@
 
         // Create this producer register after opening serialization schema!
         this.producerRegister = new TopicProducerRegister(sinkConfiguration);
+        this.mailboxExecutor = initContext.getMailboxExecutor();
     }
 
     @Override
@@ -151,44 +150,30 @@
             // We would just ignore the sending exception. This may cause data loss.
             builder.sendAsync();
         } else {
-            // Waiting for permits to write message.
-            requirePermits();
-            mailboxExecutor.execute(
-                    () -> enqueueMessageSending(topic, builder),
-                    "Failed to send message to Pulsar");
+            // Increase the pending message count.
+            pendingMessages.incrementAndGet();
+            builder.sendAsync()
+                    .whenComplete(
+                            (id, ex) -> {
+                                pendingMessages.decrementAndGet();
+                                if (ex != null) {
+                                    mailboxExecutor.execute(
+                                            () -> {
+                                                throw new FlinkRuntimeException(
+                                                        "Failed to send data to Pulsar " + topic,
+                                                        ex);
+                                            },
+                                            "Failed to send data to Pulsar");
+                                } else {
+                                    LOG.debug(
+                                            "Sent message to Pulsar {} with message id {}",
+                                            topic,
+                                            id);
+                                }
+                            });
         }
     }
 
-    private void enqueueMessageSending(String topic, TypedMessageBuilder<?> builder)
-            throws ExecutionException, InterruptedException {
-        // Block the mailbox executor for yield method.
-        builder.sendAsync()
-                .whenComplete(
-                        (id, ex) -> {
-                            this.releasePermits();
-                            if (ex != null) {
-                                throw new FlinkRuntimeException(
-                                        "Failed to send data to Pulsar " + topic, ex);
-                            } else {
-                                LOG.debug(
-                                        "Sent message to Pulsar {} with message id {}", topic, id);
-                            }
-                        })
-                .get();
-    }
-
-    private void requirePermits() throws InterruptedException {
-        while (pendingMessages >= sinkConfiguration.getMaxPendingMessages()) {
-            LOG.info("Waiting for the available permits.");
-            mailboxExecutor.yield();
-        }
-        pendingMessages++;
-    }
-
-    private void releasePermits() {
-        this.pendingMessages -= 1;
-    }
-
     @SuppressWarnings({"rawtypes", "unchecked"})
     private TypedMessageBuilder<?> createMessageBuilder(
             String topic, Context context, PulsarMessage<?> message) {
@@ -244,15 +229,14 @@
     }
 
     @Override
-    public void flush(boolean endOfInput) throws IOException, InterruptedException {
-        if (endOfInput) {
-            // Try flush only once when we meet the end of the input.
+    public void flush(boolean endOfInput) throws IOException {
+        if (endOfInput || deliveryGuarantee != DeliveryGuarantee.NONE) {
+            LOG.info("Flush the pending messages to Pulsar.");
+            // Try to flush pending messages.
             producerRegister.flush();
-        } else {
-            while (pendingMessages != 0 && deliveryGuarantee != DeliveryGuarantee.NONE) {
+            // Make sure all the pending messages should be flushed to Pulsar.
+            while (pendingMessages.longValue() > 0) {
                 producerRegister.flush();
-                LOG.info("Flush the pending messages to Pulsar.");
-                mailboxExecutor.yield();
             }
         }
     }