commit | f8c1c635e0c4a3847286fb3385e70a8898b74026 | [log] [tgz] |
---|---|---|
author | Yufan Sheng <yufan@streamnative.io> | Fri Sep 10 12:10:12 2021 +0800 |
committer | Arvid Heise <AHeise@users.noreply.github.com> | Fri Sep 10 14:52:01 2021 +0200 |
tree | 50834c8bba53e8da9eecbdb8a6ab2a91477ea3b8 | |
parent | 172e6450432309d05ffcb2ec746ce33f0976b8c6 [diff] |
[FLINK-23864][connector/pulsar] Release Pulsar Message if user enable poolMessage option.
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java index ac1821b..af650ea 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarOrderedPartitionSplitReader.java
@@ -74,6 +74,9 @@ protected void finishedPollMessage(Message<byte[]> message) { // Nothing to do here. LOG.debug("Finished polling message {}", message); + + // Release message + message.release(); } @Override
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java index efa383f..9c8e3d7 100644 --- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java +++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/reader/split/PulsarUnorderedPartitionSplitReader.java
@@ -118,6 +118,9 @@ if (sourceConfiguration.isEnableAutoAcknowledgeMessage()) { sneakyClient(() -> pulsarConsumer.acknowledge(message)); } + + // Release message + message.release(); } @Override