[pulsar-flink] Add subscription initial position (#4129)

### Motivation

Allow user to specify the initial position for consumer source builder.

### Modifications

Add initial position for PulsarConsumerSource.
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 5af82bc..046c7d3 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.client.api.Authentication;
 import org.slf4j.Logger;
@@ -71,6 +72,7 @@
 
     private final long acknowledgementBatchSize;
     private long batchCount;
+    private final SubscriptionInitialPosition initialPosition;
 
     private transient volatile boolean isRunning;
 
@@ -83,6 +85,7 @@
         this.deserializer = builder.deserializationSchema;
         this.subscriptionName = builder.subscriptionName;
         this.acknowledgementBatchSize = builder.acknowledgementBatchSize;
+        this.initialPosition = builder.initialPosition;
     }
 
     @Override
@@ -203,12 +206,14 @@
             return client.newConsumer().topicsPattern(topicsPattern)
                     .subscriptionName(subscriptionName)
                     .subscriptionType(SubscriptionType.Failover)
+                    .subscriptionInitialPosition(initialPosition)
                     .subscribe();
         } else {
             return client.newConsumer()
                     .topics(Lists.newArrayList(topicNames))
                     .subscriptionName(subscriptionName)
                     .subscriptionType(SubscriptionType.Failover)
+                    .subscriptionInitialPosition(initialPosition)
                     .subscribe();
         }
     }
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3b78495..4ca8361 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -26,6 +26,7 @@
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationFactory;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 
 import java.util.Arrays;
 import java.util.List;
@@ -51,6 +52,7 @@
     Pattern topicsPattern;
     String subscriptionName = "flink-sub";
     long acknowledgementBatchSize = ACKNOWLEDGEMENT_BATCH_SIZE;
+    SubscriptionInitialPosition initialPosition = SubscriptionInitialPosition.Latest;
 
     private PulsarSourceBuilder(DeserializationSchema<T> deserializationSchema) {
         this.deserializationSchema = deserializationSchema;
@@ -154,6 +156,18 @@
     }
 
     /**
+     * Sets the subscription initial position for the topic consumer. Default is {@link SubscriptionInitialPosition#Latest}
+     *
+     * @param initialPosition the subscription initial position.
+     * @return this builder
+     */
+    public PulsarSourceBuilder<T> subscriptionInitialPosition(SubscriptionInitialPosition initialPosition) {
+        Preconditions.checkNotNull(initialPosition,"subscription initial position cannot be null");
+        this.initialPosition = initialPosition;
+        return this;
+    }
+
+    /**
      * Sets the number of messages to receive before acknowledging. This defaults to 100. This
      * value is only used when checkpointing is disabled.
      *
diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
index 4f59b6e..67a2433 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.api.common.serialization.DeserializationSchema;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -47,6 +48,7 @@
                 .serviceUrl("testServiceUrl")
                 .topic("testTopic")
                 .subscriptionName("testSubscriptionName")
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
                 .build();
         Assert.assertNotNull(sourceFunction);
     }
@@ -112,6 +114,11 @@
         pulsarSourceBuilder.subscriptionName(" ");
     }
 
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testSubscriptionInitialPosition() {
+        pulsarSourceBuilder.subscriptionInitialPosition(null);
+    }
+
     private class TestDeserializationSchema<T> implements DeserializationSchema<T> {
 
         @Override