[pulsar-flink] Add subscription initial position (#4129)
### Motivation
Allow user to specify the initial position for consumer source builder.
### Modifications
Add initial position for PulsarConsumerSource.
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 5af82bc..046c7d3 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -32,6 +32,7 @@
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.Authentication;
import org.slf4j.Logger;
@@ -71,6 +72,7 @@
private final long acknowledgementBatchSize;
private long batchCount;
+ private final SubscriptionInitialPosition initialPosition;
private transient volatile boolean isRunning;
@@ -83,6 +85,7 @@
this.deserializer = builder.deserializationSchema;
this.subscriptionName = builder.subscriptionName;
this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
+ this.initialPosition = builder.initialPosition;
}
@Override
@@ -203,12 +206,14 @@
return client.newConsumer().topicsPattern(topicsPattern)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
+ .subscriptionInitialPosition(initialPosition)
.subscribe();
} else {
return client.newConsumer()
.topics(Lists.newArrayList(topicNames))
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Failover)
+ .subscriptionInitialPosition(initialPosition)
.subscribe();
}
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3b78495..4ca8361 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import java.util.Arrays;
import java.util.List;
@@ -51,6 +52,7 @@
Pattern topicsPattern;
String subscriptionName = "flink-sub";
long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
+ SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Latest;
private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
this.deserializationSchema = deserializationSchema;
@@ -154,6 +156,18 @@
}
/**
+ * Sets the subscription initial position for the topic consumer. Default is {@link SubscriptionInitialPosition#Latest}
+ *
+ * @param initialPosition the subscription initial position.
+ * @return this builder
+ */
+ public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
+ Preconditions.checkNotNull(initialPosition,"subscription initial position cannot be null");
+ this.initialPosition = initialPosition;
+ return this;
+ }
+
+ /**
* Sets the number of messages to receive before acknowledging. This defaults to 100. This
* value is only used when checkpointing is disabled.
*
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index 4f59b6e..67a2433 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -47,6 +48,7 @@
.serviceUrl("testServiceUrl")
.topic("testTopic")
.subscriptionName("testSubscriptionName")
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.build();
Assert.assertNotNull(sourceFunction);
}
@@ -112,6 +114,11 @@
pulsarSourceBuilder.subscriptionName(" ");
}
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testSubscriptionInitialPosition() {
+ pulsarSourceBuilder.subscriptionInitialPosition(null);
+ }
+
private class TestDeserializationSchema<T> implements DeserializationSchema<T> {
@Override