[FLINK-23969][connector/pulsar] Create e2e tests for pulsar connector.
diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml
index 9992ef9..c3e5e9a 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -169,6 +169,12 @@
 			<groupId>org.apache.pulsar</groupId>
 			<artifactId>pulsar-client-all</artifactId>
 			<version>${pulsar.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.apache.pulsar</groupId>
+					<artifactId>pulsar-package-core</artifactId>
+				</exclusion>
+			</exclusions>
 		</dependency>
 	</dependencies>
 
@@ -258,6 +264,47 @@
 					</execution>
 				</executions>
 			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<executions>
+					<execution>
+						<goals>
+							<goal>test-jar</goal>
+						</goals>
+						<configuration>
+							<includes>
+								<include>**/testutils/**</include>
+								<include>META-INF/LICENSE</include>
+								<include>META-INF/NOTICE</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-source-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>attach-test-sources</id>
+						<goals>
+							<goal>test-jar-no-fork</goal>
+						</goals>
+						<configuration>
+							<archive>
+								<!-- Globally exclude maven metadata, because it may accidentally bundle files we don't intend to -->
+								<addMavenDescriptor>false</addMavenDescriptor>
+							</archive>
+							<includes>
+								<include>**/testutils/**</include>
+								<include>META-INF/LICENSE</include>
+								<include>META-INF/NOTICE</include>
+							</includes>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
index 1d1574b..5b77922 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
@@ -57,6 +57,8 @@
     public TopicRange(int start, int end) {
         checkArgument(start >= MIN_RANGE, "Start range %s shouldn't below zero.", start);
         checkArgument(end <= MAX_RANGE, "End range %s shouldn't exceed 65535.", end);
+        checkArgument(start <= end, "Range end must >= range start.");
+
         this.start = start;
         this.end = end;
     }
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
new file mode 100644
index 0000000..6f82725
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
+
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+
+import java.util.List;
+
+/** Always return the same range set for all topics. */
+public class FixedRangeGenerator implements RangeGenerator {
+    private static final long serialVersionUID = -3895203007855538734L;
+
+    private final List<TopicRange> ranges;
+
+    public FixedRangeGenerator(List<TopicRange> ranges) {
+        this.ranges = ranges;
+    }
+
+    @Override
+    public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
+        return ranges;
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 89457dc..8cc2e0a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -37,7 +37,7 @@
     @TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
 
     // Defines pulsar running environment
-    @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.MOCK);
+    @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
 
     // Defines a external context Factories,
     // so test cases will be invoked using this external contexts.
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
index ea9c4ab..c2afee5 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
@@ -18,41 +18,31 @@
 
 package org.apache.flink.connector.pulsar.testutils;
 
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
 import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 
 import java.util.Collection;
 
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-
 /** Source split data writer for writing test data into a Pulsar topic partition. */
 public class PulsarPartitionDataWriter implements SourceSplitDataWriter<String> {
 
-    private final Producer<String> producer;
+    private final PulsarRuntimeOperator operator;
+    private final String fullTopicName;
 
-    public PulsarPartitionDataWriter(PulsarClient client, TopicPartition partition) {
-        try {
-            this.producer =
-                    client.newProducer(Schema.STRING).topic(partition.getFullTopicName()).create();
-        } catch (PulsarClientException e) {
-            throw new IllegalStateException(e);
-        }
+    public PulsarPartitionDataWriter(PulsarRuntimeOperator operator, String fullTopicName) {
+        this.operator = operator;
+        this.fullTopicName = fullTopicName;
     }
 
     @Override
     public void writeRecords(Collection<String> records) {
-        for (String record : records) {
-            sneakyClient(() -> producer.newMessage().value(record).send());
-        }
+        operator.sendMessages(fullTopicName, Schema.STRING, records);
     }
 
     @Override
-    public void close() throws Exception {
-        producer.close();
+    public void close() {
+        // Nothing to do.
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index a80d721..2ad4c2f 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -29,15 +29,14 @@
 
 /** Common test context for pulsar based test. */
 public abstract class PulsarTestContext<T> implements ExternalContext<T> {
+    private static final long serialVersionUID = 4717940854368532130L;
 
     private static final int NUM_RECORDS_UPPER_BOUND = 500;
     private static final int NUM_RECORDS_LOWER_BOUND = 100;
 
-    private final String displayName;
     protected final PulsarRuntimeOperator operator;
 
-    protected PulsarTestContext(String displayName, PulsarTestEnvironment environment) {
-        this.displayName = displayName;
+    protected PulsarTestContext(PulsarTestEnvironment environment) {
         this.operator = environment.operator();
     }
 
@@ -58,8 +57,10 @@
         return records;
     }
 
+    protected abstract String displayName();
+
     @Override
     public String toString() {
-        return displayName;
+        return displayName();
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
index 7b31c7c..50ca3fe 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
 import org.apache.flink.connectors.test.common.TestResource;
 import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
 
@@ -54,10 +53,10 @@
 public class PulsarTestEnvironment
         implements BeforeAllCallback, AfterAllCallback, TestResource, TestRule {
 
-    private final PulsarRuntimeProvider provider;
+    private final PulsarRuntime runtime;
 
     public PulsarTestEnvironment(PulsarRuntime runtime) {
-        this.provider = runtime.provider();
+        this.runtime = runtime;
     }
 
     /** JUnit 4 Rule based test logic. */
@@ -66,7 +65,7 @@
         return new Statement() {
             @Override
             public void evaluate() throws Throwable {
-                provider.startUp();
+                runtime.startUp();
 
                 List<Throwable> errors = new ArrayList<>();
                 try {
@@ -75,7 +74,7 @@
                     errors.add(t);
                 } finally {
                     try {
-                        provider.tearDown();
+                        runtime.tearDown();
                     } catch (Throwable t) {
                         errors.add(t);
                     }
@@ -88,29 +87,29 @@
     /** JUnit 5 Extension setup method. */
     @Override
     public void beforeAll(ExtensionContext context) {
-        provider.startUp();
+        runtime.startUp();
     }
 
     /** flink-connector-testing setup method. */
     @Override
     public void startUp() {
-        provider.startUp();
+        runtime.startUp();
     }
 
     /** JUnit 5 Extension shutdown method. */
     @Override
     public void afterAll(ExtensionContext context) {
-        provider.tearDown();
+        runtime.tearDown();
     }
 
     /** flink-connector-testing shutdown method. */
     @Override
     public void tearDown() {
-        provider.tearDown();
+        runtime.tearDown();
     }
 
     /** Get a common supported set of method for operating pulsar which is in container. */
     public PulsarRuntimeOperator operator() {
-        return provider.operator();
+        return runtime.operator();
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
index 2321bd4..18a8655 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
@@ -56,7 +56,7 @@
      * pulsar broker. Override this method when needs.
      */
     protected PulsarRuntime runtime() {
-        return PulsarRuntime.MOCK;
+        return PulsarRuntime.mock();
     }
 
     /** Operate pulsar by acquiring a runtime operator. */
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
index 7ce676c..12dbabe 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
@@ -18,96 +18,33 @@
 
 package org.apache.flink.connector.pulsar.testutils.cases;
 
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import org.apache.pulsar.client.api.SubscriptionType;
 
 /**
  * Pulsar external context that will create multiple topics with only one partitions as source
  * splits.
  */
-public class MultipleTopicConsumingContext extends PulsarTestContext<String> {
-
-    private int numTopics = 0;
-
-    private final String topicPattern;
-
-    private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters =
-            new HashMap<>();
+public class MultipleTopicConsumingContext extends MultipleTopicTemplateContext {
+    private static final long serialVersionUID = -3855336888090886528L;
 
     public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
-        super("consuming message on multiple topic", environment);
-        this.topicPattern =
-                "pulsar-multiple-topic-[0-9]+-"
-                        + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+        super(environment);
     }
 
     @Override
-    public Source<String, ?, ?> createSource(Boundedness boundedness) {
-        PulsarSourceBuilder<String> builder =
-                PulsarSource.builder()
-                        .setDeserializationSchema(pulsarSchema(STRING))
-                        .setServiceUrl(operator.serviceUrl())
-                        .setAdminUrl(operator.adminUrl())
-                        .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics)
-                        .setSubscriptionType(Exclusive)
-                        .setSubscriptionName("flink-pulsar-multiple-topic-test");
-        if (boundedness == Boundedness.BOUNDED) {
-            // Using latest stop cursor for making sure the source could be stopped.
-            // This is required for SourceTestSuiteBase.
-            builder.setBoundedStopCursor(StopCursor.latest());
-        }
-
-        return builder.build();
+    protected String displayName() {
+        return "consuming message on multiple topic";
     }
 
     @Override
-    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
-        String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics));
-        operator.createTopic(topicName, 1);
-
-        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
-        TopicPartition partition = new TopicPartition(partitionName, 0, createFullRange());
-        PulsarPartitionDataWriter writer =
-                new PulsarPartitionDataWriter(operator.client(), partition);
-
-        topicNameToSplitWriters.put(partitionName, writer);
-        numTopics++;
-
-        return writer;
+    protected String subscriptionName() {
+        return "flink-pulsar-multiple-topic-test";
     }
 
     @Override
-    public Collection<String> generateTestData(int splitIndex, long seed) {
-        return generateStringTestData(splitIndex, seed);
-    }
-
-    @Override
-    public void close() throws Exception {
-        for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) {
-            writer.close();
-        }
-
-        topicNameToSplitWriters.clear();
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Exclusive;
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
new file mode 100644
index 0000000..a0801ec
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.cases;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.apache.pulsar.client.api.Schema.STRING;
+
+/**
+ * Pulsar external context template that will create multiple topics with only one partitions as
+ * source splits.
+ */
+public abstract class MultipleTopicTemplateContext extends PulsarTestContext<String> {
+    private static final long serialVersionUID = 7333807392445848344L;
+
+    private int numTopics = 0;
+
+    private final String topicPattern = "pulsar-multiple-topic-[0-9]+-" + randomAlphabetic(8);
+
+    private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters =
+            new HashMap<>();
+
+    public MultipleTopicTemplateContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    public Source<String, ?, ?> createSource(Boundedness boundedness) {
+        PulsarSourceBuilder<String> builder =
+                PulsarSource.builder()
+                        .setDeserializationSchema(pulsarSchema(STRING))
+                        .setServiceUrl(serviceUrl())
+                        .setAdminUrl(adminUrl())
+                        .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics)
+                        .setSubscriptionType(subscriptionType())
+                        .setSubscriptionName(subscriptionName());
+        if (boundedness == Boundedness.BOUNDED) {
+            // Using latest stop cursor for making sure the source could be stopped.
+            // This is required for SourceTestSuiteBase.
+            builder.setBoundedStopCursor(StopCursor.latest());
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+        String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics));
+        operator.createTopic(topicName, 1);
+
+        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
+        PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
+
+        topicNameToSplitWriters.put(partitionName, writer);
+        numTopics++;
+
+        return writer;
+    }
+
+    @Override
+    public Collection<String> generateTestData(int splitIndex, long seed) {
+        return generateStringTestData(splitIndex, seed);
+    }
+
+    @Override
+    public void close() throws Exception {
+        for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) {
+            writer.close();
+        }
+
+        topicNameToSplitWriters.clear();
+    }
+
+    protected abstract String subscriptionName();
+
+    protected abstract SubscriptionType subscriptionType();
+
+    protected String serviceUrl() {
+        return operator.serviceUrl();
+    }
+
+    protected String adminUrl() {
+        return operator.adminUrl();
+    }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
index cb1b582..b89511c 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
@@ -23,7 +23,7 @@
 import org.apache.flink.connector.pulsar.source.PulsarSource;
 import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
 import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
 import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
@@ -34,7 +34,6 @@
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
 import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
 import static org.apache.pulsar.client.api.Schema.STRING;
 import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
@@ -44,6 +43,7 @@
  * source splits.
  */
 public class SingleTopicConsumingContext extends PulsarTestContext<String> {
+    private static final long serialVersionUID = 2754642285356345741L;
 
     private static final String TOPIC_NAME_PREFIX = "pulsar-single-topic";
     private final String topicName;
@@ -53,12 +53,17 @@
     private int numSplits = 0;
 
     public SingleTopicConsumingContext(PulsarTestEnvironment environment) {
-        super("consuming message on single topic", environment);
+        super(environment);
         this.topicName =
                 TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
     }
 
     @Override
+    protected String displayName() {
+        return "consuming message on single topic";
+    }
+
+    @Override
     public Source<String, ?, ?> createSource(Boundedness boundedness) {
         PulsarSourceBuilder<String> builder =
                 PulsarSource.builder()
@@ -88,9 +93,8 @@
             operator.increaseTopicPartitions(topicName, numSplits);
         }
 
-        TopicPartition partition = new TopicPartition(topicName, numSplits - 1, createFullRange());
-        PulsarPartitionDataWriter writer =
-                new PulsarPartitionDataWriter(operator.client(), partition);
+        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, numSplits - 1);
+        PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
         partitionToSplitWriter.put(numSplits - 1, writer);
 
         return writer;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
index 986f4bd..d46658e 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
@@ -18,33 +18,36 @@
 
 package org.apache.flink.connector.pulsar.testutils.runtime;
 
-import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerProvider;
-import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockProvider;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
+import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;
 
-import java.util.function.Supplier;
+import org.testcontainers.containers.GenericContainer;
 
 /**
- * A enum class for providing a operable pulsar runtime. We support two types of runtime, the
- * container and mock.
+ * An abstraction for different pulsar runtimes. Providing the common methods for {@link
+ * PulsarTestEnvironment}.
  */
-public enum PulsarRuntime {
+public interface PulsarRuntime {
 
-    /**
-     * The whole pulsar cluster would run in a docker container, provide the full fledged test
-     * backend.
-     */
-    CONTAINER(PulsarContainerProvider::new),
+    /** Start up this pulsar runtime, block the thread until everytime is ready for this runtime. */
+    void startUp();
 
-    /** The bookkeeper and zookeeper would use a mock backend, and start a single pulsar broker. */
-    MOCK(PulsarMockProvider::new);
+    /** Shutdown this pulsar runtime. */
+    void tearDown();
 
-    private final Supplier<PulsarRuntimeProvider> provider;
+    /** Return a operator for operating this pulsar runtime. */
+    PulsarRuntimeOperator operator();
 
-    PulsarRuntime(Supplier<PulsarRuntimeProvider> provider) {
-        this.provider = provider;
+    static PulsarRuntime mock() {
+        return new PulsarMockRuntime();
     }
 
-    public PulsarRuntimeProvider provider() {
-        return provider.get();
+    static PulsarRuntime container() {
+        return new PulsarContainerRuntime();
+    }
+
+    static PulsarRuntime container(GenericContainer<?> flinkContainer) {
+        return new PulsarContainerRuntime().bindWithFlinkContainer(flinkContainer);
     }
 }
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index a68c065..2d26925 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -24,6 +24,8 @@
 import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
 import org.apache.flink.connectors.test.common.external.ExternalContext;
 
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.MessageId;
@@ -148,13 +150,30 @@
         return messageIds.get(0);
     }
 
+    public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T message) {
+        List<MessageId> messageIds = sendMessages(topic, schema, key, singletonList(message));
+        checkArgument(messageIds.size() == 1);
+
+        return messageIds.get(0);
+    }
+
     public <T> List<MessageId> sendMessages(
             String topic, Schema<T> schema, Collection<T> messages) {
+        return sendMessages(topic, schema, null, messages);
+    }
+
+    public <T> List<MessageId> sendMessages(
+            String topic, Schema<T> schema, String key, Collection<T> messages) {
         try (Producer<T> producer = client().newProducer(schema).topic(topic).create()) {
             List<MessageId> messageIds = new ArrayList<>(messages.size());
 
             for (T message : messages) {
-                MessageId messageId = producer.newMessage().value(message).send();
+                MessageId messageId;
+                if (Strings.isNullOrEmpty(key)) {
+                    messageId = producer.newMessage().value(message).send();
+                } else {
+                    messageId = producer.newMessage().key(key).value(message).send();
+                }
                 messageIds.add(messageId);
             }
 
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java
deleted file mode 100644
index d8ad718..0000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-
-/**
- * A abstraction for different pulsar runtimes. Providing the common methods for {@link
- * PulsarTestEnvironment}.
- */
-public interface PulsarRuntimeProvider {
-
-    /** Start up this pulsar runtime, block the thread until everytime is ready for this runtime. */
-    void startUp();
-
-    /** Shutdown this pulsar runtime. */
-    void tearDown();
-
-    /** Return a operator for operating this pulsar runtime. */
-    PulsarRuntimeOperator operator();
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
similarity index 75%
rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 06be3cb..5560767 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.connector.pulsar.testutils.runtime.container;
 
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
 import org.apache.flink.util.DockerImageVersions;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.PulsarContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
@@ -36,13 +37,22 @@
 import static org.apache.flink.util.DockerImageVersions.PULSAR;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT;
+import static org.testcontainers.containers.PulsarContainer.BROKER_PORT;
 
 /**
- * {@link PulsarRuntimeProvider} implementation, use the TestContainers as the backend. We would
- * start a pulsar container by this provider.
+ * {@link PulsarRuntime} implementation, use the TestContainers as the backend. We would start a
+ * pulsar container by this provider.
  */
-public class PulsarContainerProvider implements PulsarRuntimeProvider {
-    private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerProvider.class);
+public class PulsarContainerRuntime implements PulsarRuntime {
+    private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerRuntime.class);
+    private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar";
+
+    // This url is used on the container side.
+    public static final String PULSAR_SERVICE_URL =
+            String.format("pulsar://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_PORT);
+    // This url is used on the container side.
+    public static final String PULSAR_ADMIN_URL =
+            String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT);
 
     /**
      * Create a pulsar container provider by a predefined version, this constance {@link
@@ -52,6 +62,14 @@
 
     private PulsarRuntimeOperator operator;
 
+    public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer<?> flinkContainer) {
+        this.container
+                .withNetworkAliases(PULSAR_INTERNAL_HOSTNAME)
+                .dependsOn(flinkContainer)
+                .withNetwork(flinkContainer.getNetwork());
+        return this;
+    }
+
     @Override
     public void startUp() {
         // Prepare Pulsar Container.
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
similarity index 76%
rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
index 1cb8ce9..552ce42 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.connector.pulsar.testutils.runtime.mock;
 
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
 import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
 
@@ -39,17 +39,17 @@
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Providing a mocked pulsar server. */
-public class PulsarMockProvider implements PulsarRuntimeProvider {
+public class PulsarMockRuntime implements PulsarRuntime {
 
     private static final String CLUSTER_NAME = "mock-pulsar-" + randomAlphanumeric(6);
     private final MockPulsarService pulsarService;
     private PulsarRuntimeOperator operator;
 
-    public PulsarMockProvider() {
+    public PulsarMockRuntime() {
         this(createConfig());
     }
 
-    public PulsarMockProvider(ServiceConfiguration configuration) {
+    public PulsarMockRuntime(ServiceConfiguration configuration) {
         this.pulsarService = new MockPulsarService(configuration);
     }
 
@@ -95,20 +95,34 @@
             admin.clusters().createCluster(CLUSTER_NAME, data);
         }
 
+        createOrUpdateTenant("public");
+        createOrUpdateNamespace("public", "default");
+
+        createOrUpdateTenant("pulsar");
+        createOrUpdateNamespace("pulsar", "system");
+    }
+
+    private void createOrUpdateTenant(String tenant) throws PulsarAdminException {
+        PulsarAdmin admin = operator().admin();
         TenantInfo info =
                 TenantInfo.builder()
                         .adminRoles(ImmutableSet.of("appid1", "appid2"))
                         .allowedClusters(ImmutableSet.of(CLUSTER_NAME))
                         .build();
-        if (!admin.tenants().getTenants().contains("public")) {
-            admin.tenants().createTenant("public", info);
+        if (!admin.tenants().getTenants().contains(tenant)) {
+            admin.tenants().createTenant(tenant, info);
         } else {
-            admin.tenants().updateTenant("public", info);
+            admin.tenants().updateTenant(tenant, info);
         }
+    }
 
-        if (!admin.namespaces().getNamespaces("public").contains("public/default")) {
-            admin.namespaces().createNamespace("public/default");
-            admin.namespaces().setRetention("public/default", new RetentionPolicies(60, 1000));
+    public void createOrUpdateNamespace(String tenant, String namespace)
+            throws PulsarAdminException {
+        PulsarAdmin admin = operator().admin();
+        String namespaceValue = tenant + "/" + namespace;
+        if (!admin.namespaces().getNamespaces(tenant).contains(namespaceValue)) {
+            admin.namespaces().createNamespace(namespaceValue);
+            admin.namespaces().setRetention(namespaceValue, new RetentionPolicies(60, 1000));
         }
     }
 
@@ -135,6 +149,13 @@
         configuration.setBrokerServicePort(Optional.of(findAvailablePort()));
         configuration.setWebServicePort(Optional.of(findAvailablePort()));
 
+        // Enable transaction with in memory.
+        configuration.setTransactionCoordinatorEnabled(true);
+        configuration.setTransactionMetadataStoreProviderClassName(
+                "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider");
+        configuration.setTransactionBufferProviderClassName(
+                "org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider");
+
         return configuration;
     }
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
index 070bd82..9630003 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
@@ -42,10 +42,7 @@
     public FlinkContainerTestEnvironment(
             int numTaskManagers, int numSlotsPerTaskManager, String... jarPath) {
 
-        Configuration flinkConfiguration = new Configuration();
-        flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L);
-        flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L);
-        flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L);
+        Configuration flinkConfiguration = flinkConfiguration();
         flinkConfiguration.set(NUM_TASK_SLOTS, numSlotsPerTaskManager);
 
         this.flinkContainer =
@@ -113,4 +110,13 @@
     public FlinkContainer getFlinkContainer() {
         return this.flinkContainer;
     }
+
+    protected Configuration flinkConfiguration() {
+        Configuration flinkConfiguration = new Configuration();
+        flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L);
+        flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L);
+        flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L);
+
+        return flinkConfiguration;
+    }
 }
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
new file mode 100644
index 0000000..269f89c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<parent>
+		<artifactId>flink-end-to-end-tests</artifactId>
+		<groupId>org.apache.flink</groupId>
+		<version>1.15-SNAPSHOT</version>
+	</parent>
+	<modelVersion>4.0.0</modelVersion>
+
+	<artifactId>flink-end-to-end-tests-pulsar</artifactId>
+	<name>Flink : E2E Tests : Pulsar</name>
+
+	<properties>
+		<pulsar.version>2.8.0</pulsar.version>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-end-to-end-tests-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<type>test-jar</type>
+		</dependency>
+		<dependency>
+			<groupId>org.testcontainers</groupId>
+			<artifactId>pulsar</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-dependency-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>copy</id>
+						<phase>pre-integration-test</phase>
+						<goals>
+							<goal>copy</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<artifactItems>
+						<artifactItem>
+							<groupId>org.apache.flink</groupId>
+							<artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+							<version>${project.version}</version>
+							<destFileName>pulsar-connector.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.pulsar</groupId>
+							<artifactId>pulsar-client-all</artifactId>
+							<version>${pulsar.version}</version>
+							<destFileName>pulsar-client-all.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.pulsar</groupId>
+							<artifactId>pulsar-client-api</artifactId>
+							<version>${pulsar.version}</version>
+							<destFileName>pulsar-client-api.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.apache.pulsar</groupId>
+							<artifactId>pulsar-client-admin-api</artifactId>
+							<version>${pulsar.version}</version>
+							<destFileName>pulsar-admin-api.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+						<artifactItem>
+							<groupId>org.slf4j</groupId>
+							<artifactId>jul-to-slf4j</artifactId>
+							<version>${slf4j.version}</version>
+							<destFileName>jul-to-slf4j.jar</destFileName>
+							<type>jar</type>
+							<outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+						</artifactItem>
+					</artifactItems>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
new file mode 100644
index 0000000..1427c2b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/**
+ * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive
+ * subscription.
+ */
+public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
+
+    // Defines TestEnvironment.
+    @TestEnv
+    FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6);
+
+    // Defines ConnectorExternalSystem.
+    @ExternalSystem
+    PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainer()));
+
+    // Defines a set of external context Factories for different test cases.
+    @ExternalContextFactory
+    PulsarTestContextFactory<String, ExclusiveSubscriptionContext> exclusive =
+            new PulsarTestContextFactory<>(pulsar, ExclusiveSubscriptionContext::new);
+
+    @ExternalContextFactory
+    PulsarTestContextFactory<String, FailoverSubscriptionContext> failover =
+            new PulsarTestContextFactory<>(pulsar, FailoverSubscriptionContext::new);
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
new file mode 100644
index 0000000..25cab21
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/**
+ * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
+ * subscription.
+ */
+public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
+
+    // Defines TestEnvironment.
+    @TestEnv
+    FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8);
+
+    // Defines ConnectorExternalSystem.
+    @ExternalSystem
+    PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainer()));
+
+    // Defines a set of external context Factories for different test cases.
+    @ExternalContextFactory
+    PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
+            new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
+
+    @ExternalContextFactory
+    PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
+            new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
new file mode 100644
index 0000000..18b2ffc
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+
+/** We would consuming from test splits by using {@link SubscriptionType#Exclusive} subscription. */
+public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
+    private static final long serialVersionUID = 6238209089442257487L;
+
+    public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message by Exclusive";
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-exclusive-subscription";
+    }
+
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Exclusive;
+    }
+
+    @Override
+    protected String serviceUrl() {
+        return PULSAR_SERVICE_URL;
+    }
+
+    @Override
+    protected String adminUrl() {
+        return PULSAR_ADMIN_URL;
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
new file mode 100644
index 0000000..c322efa
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+
+/** We would consuming from test splits by using {@link SubscriptionType#Failover} subscription. */
+public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+    private static final long serialVersionUID = 6238209089442257487L;
+
+    public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message by Failover";
+    }
+
+    @Override
+    protected String subscriptionName() {
+        return "pulsar-failover-subscription";
+    }
+
+    @Override
+    protected SubscriptionType subscriptionType() {
+        return SubscriptionType.Failover;
+    }
+
+    @Override
+    protected String serviceUrl() {
+        return PULSAR_SERVICE_URL;
+    }
+
+    @Override
+    protected String adminUrl() {
+        return PULSAR_ADMIN_URL;
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
new file mode 100644
index 0000000..e442418
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
+
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+import static org.apache.pulsar.client.api.Schema.STRING;
+
+/**
+ * We would consuming from test splits by using {@link SubscriptionType#Key_Shared} subscription.
+ */
+public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
+    private static final long serialVersionUID = 3246516520107893983L;
+
+    private int index = 0;
+
+    private final List<KeyedPulsarPartitionDataWriter> writers = new ArrayList<>();
+
+    // Message keys.
+    private final String key1;
+    private final String key2;
+
+    public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+
+        // Init message keys.
+        this.key1 = randomAlphabetic(8);
+        String newKey2;
+        do {
+            newKey2 = randomAlphabetic(8);
+        } while (keyHash(key1) == keyHash(newKey2));
+        this.key2 = newKey2;
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message by Key_Shared";
+    }
+
+    @Override
+    public Source<String, ?, ?> createSource(Boundedness boundedness) {
+        int keyHash = keyHash(key1);
+        TopicRange range = new TopicRange(keyHash, keyHash);
+
+        PulsarSourceBuilder<String> builder =
+                PulsarSource.builder()
+                        .setDeserializationSchema(pulsarSchema(STRING))
+                        .setServiceUrl(PULSAR_SERVICE_URL)
+                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setTopicPattern(
+                                "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics)
+                        .setSubscriptionType(SubscriptionType.Key_Shared)
+                        .setSubscriptionName("pulsar-key-shared")
+                        .setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
+        if (boundedness == Boundedness.BOUNDED) {
+            // Using latest stop cursor for making sure the source could be stopped.
+            builder.setBoundedStopCursor(StopCursor.latest());
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+        String topicName = "pulsar-" + index + "-key-shared";
+        operator.createTopic(topicName, 1);
+        index++;
+
+        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
+        KeyedPulsarPartitionDataWriter writer =
+                new KeyedPulsarPartitionDataWriter(operator, partitionName, key1, key2);
+        writers.add(writer);
+
+        return writer;
+    }
+
+    @Override
+    public Collection<String> generateTestData(int splitIndex, long seed) {
+        return generateStringTestData(splitIndex, seed);
+    }
+
+    @Override
+    public void close() {
+        for (KeyedPulsarPartitionDataWriter writer : writers) {
+            writer.close();
+        }
+        writers.clear();
+    }
+
+    private int keyHash(String key) {
+        return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
new file mode 100644
index 0000000..f936b6f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+import static org.apache.pulsar.client.api.Schema.STRING;
+
+/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */
+public class SharedSubscriptionContext extends PulsarTestContext<String> {
+    private static final long serialVersionUID = -2798707923661295245L;
+
+    private int index = 0;
+
+    private final List<PulsarPartitionDataWriter> writers = new ArrayList<>();
+
+    public SharedSubscriptionContext(PulsarTestEnvironment environment) {
+        super(environment);
+    }
+
+    @Override
+    protected String displayName() {
+        return "consuming message by Shared";
+    }
+
+    @Override
+    public Source<String, ?, ?> createSource(Boundedness boundedness) {
+        PulsarSourceBuilder<String> builder =
+                PulsarSource.builder()
+                        .setDeserializationSchema(pulsarSchema(STRING))
+                        .setServiceUrl(PULSAR_SERVICE_URL)
+                        .setAdminUrl(PULSAR_ADMIN_URL)
+                        .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics)
+                        .setSubscriptionType(SubscriptionType.Shared)
+                        .setSubscriptionName("pulsar-shared");
+        if (boundedness == Boundedness.BOUNDED) {
+            // Using latest stop cursor for making sure the source could be stopped.
+            builder.setBoundedStopCursor(StopCursor.latest());
+        }
+
+        return builder.build();
+    }
+
+    @Override
+    public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+        String topicName = "pulsar-" + index + "-shared";
+        operator.createTopic(topicName, 1);
+        index++;
+
+        String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
+        PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
+        writers.add(writer);
+
+        return writer;
+    }
+
+    @Override
+    public Collection<String> generateTestData(int splitIndex, long seed) {
+        return generateStringTestData(splitIndex, seed);
+    }
+
+    @Override
+    public void close() {
+        for (PulsarPartitionDataWriter writer : writers) {
+            writer.close();
+        }
+        writers.clear();
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
new file mode 100644
index 0000000..890d09e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+
+import static org.apache.flink.configuration.TaskManagerOptions.TASK_OFF_HEAP_MEMORY;
+
+/** A Flink Container which would bundles pulsar connector in its classpath. */
+public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment {
+
+    public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int numSlotsPerTaskManager) {
+        super(
+                numTaskManagers,
+                numSlotsPerTaskManager,
+                resourcePath("pulsar-connector.jar"),
+                resourcePath("pulsar-client-all.jar"),
+                resourcePath("pulsar-client-api.jar"),
+                resourcePath("pulsar-admin-api.jar"),
+                resourcePath("jul-to-slf4j.jar"));
+    }
+
+    private static String resourcePath(String jarName) {
+        return TestUtils.getResource(jarName).toAbsolutePath().toString();
+    }
+
+    @Override
+    protected Configuration flinkConfiguration() {
+        Configuration configuration = super.flinkConfiguration();
+        // Increase the off heap memory for avoiding direct buffer memory error on Pulsar e2e tests.
+        configuration.set(TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100));
+
+        return configuration;
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
new file mode 100644
index 0000000..eea97e6
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Source split data writer for writing test data into a Pulsar topic partition. It will write the
+ * message with two keys.
+ */
+public class KeyedPulsarPartitionDataWriter implements SourceSplitDataWriter<String> {
+
+    private final PulsarRuntimeOperator operator;
+    private final String fullTopicName;
+    private final String key1;
+    private final String key2;
+
+    public KeyedPulsarPartitionDataWriter(
+            PulsarRuntimeOperator operator, String fullTopicName, String key1, String key2) {
+        this.operator = operator;
+        this.fullTopicName = fullTopicName;
+        this.key1 = key1;
+        this.key2 = key2;
+    }
+
+    @Override
+    public void writeRecords(Collection<String> records) {
+        operator.sendMessages(fullTopicName, Schema.STRING, key1, records);
+
+        List<String> newRecords = records.stream().map(a -> a + key1).collect(toList());
+        operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords);
+    }
+
+    @Override
+    public void close() {
+        // Nothing to do.
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
new file mode 100644
index 0000000..c452fe6
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/** A source test template for testing the messages which could be consumed in a unordered way. */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class UnorderedSourceTestSuiteBase<T> {
+
+    @TestTemplate
+    @DisplayName("Test source with one split and four consumers")
+    public void testOneSplitWithMultipleConsumers(
+            TestEnvironment testEnv, ExternalContext<T> externalContext) throws Exception {
+        Collection<T> testData =
+                externalContext.generateTestData(0, ThreadLocalRandom.current().nextLong());
+        SourceSplitDataWriter<T> writer = externalContext.createSourceSplitDataWriter();
+        writer.writeRecords(testData);
+
+        Source<T, ?, ?> source = externalContext.createSource(Boundedness.BOUNDED);
+        StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment();
+        List<T> results =
+                execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source")
+                        .setParallelism(4)
+                        .executeAndCollect(
+                                "Source single split with four readers.", testData.size());
+
+        assertThat(results, containsInAnyOrder(testData.toArray()));
+    }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8aada21..84ca52e 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -88,6 +88,7 @@
 		<module>flink-python-test</module>
 		<module>flink-end-to-end-tests-hbase</module>
 		<module>flink-glue-schema-registry-test</module>
+		<module>flink-end-to-end-tests-pulsar</module>
 	</modules>
 
 	<dependencyManagement>
diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
index 521d055..110ba08 100644
--- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
@@ -39,3 +39,4 @@
 flink-elasticsearch5-test
 flink-high-parallelism-iterations-test
 flink-end-to-end-tests-common-kafka
+flink-end-to-end-tests-pulsar