[FLINK-23969][connector/pulsar] Create e2e tests for pulsar connector.
diff --git a/flink-connectors/flink-connector-pulsar/pom.xml b/flink-connectors/flink-connector-pulsar/pom.xml
index 9992ef9..c3e5e9a 100644
--- a/flink-connectors/flink-connector-pulsar/pom.xml
+++ b/flink-connectors/flink-connector-pulsar/pom.xml
@@ -169,6 +169,12 @@
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-all</artifactId>
<version>${pulsar.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-package-core</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
@@ -258,6 +264,47 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ <configuration>
+ <includes>
+ <include>**/testutils/**</include>
+ <include>META-INF/LICENSE</include>
+ <include>META-INF/NOTICE</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-source-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>attach-test-sources</id>
+ <goals>
+ <goal>test-jar-no-fork</goal>
+ </goals>
+ <configuration>
+ <archive>
+ <!-- Globally exclude maven metadata, because it may accidentally bundle files we don't intend to -->
+ <addMavenDescriptor>false</addMavenDescriptor>
+ </archive>
+ <includes>
+ <include>**/testutils/**</include>
+ <include>META-INF/LICENSE</include>
+ <include>META-INF/NOTICE</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
</project>
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
index 1d1574b..5b77922 100644
--- a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/TopicRange.java
@@ -57,6 +57,8 @@
public TopicRange(int start, int end) {
checkArgument(start >= MIN_RANGE, "Start range %s shouldn't below zero.", start);
checkArgument(end <= MAX_RANGE, "End range %s shouldn't exceed 65535.", end);
+ checkArgument(start <= end, "Range end must >= range start.");
+
this.start = start;
this.end = end;
}
diff --git a/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
new file mode 100644
index 0000000..6f82725
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/source/enumerator/topic/range/FixedRangeGenerator.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.source.enumerator.topic.range;
+
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicMetadata;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+
+import java.util.List;
+
+/** Always return the same range set for all topics. */
+public class FixedRangeGenerator implements RangeGenerator {
+ private static final long serialVersionUID = -3895203007855538734L;
+
+ private final List<TopicRange> ranges;
+
+ public FixedRangeGenerator(List<TopicRange> ranges) {
+ this.ranges = ranges;
+ }
+
+ @Override
+ public List<TopicRange> range(TopicMetadata metadata, int parallelism) {
+ return ranges;
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 89457dc..8cc2e0a 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -37,7 +37,7 @@
@TestEnv MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
// Defines pulsar running environment
- @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.MOCK);
+ @ExternalSystem PulsarTestEnvironment pulsar = new PulsarTestEnvironment(PulsarRuntime.mock());
// Defines a external context Factories,
// so test cases will be invoked using this external contexts.
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
index ea9c4ab..c2afee5 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarPartitionDataWriter.java
@@ -18,41 +18,31 @@
package org.apache.flink.connector.pulsar.testutils;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import java.util.Collection;
-import static org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils.sneakyClient;
-
/** Source split data writer for writing test data into a Pulsar topic partition. */
public class PulsarPartitionDataWriter implements SourceSplitDataWriter<String> {
- private final Producer<String> producer;
+ private final PulsarRuntimeOperator operator;
+ private final String fullTopicName;
- public PulsarPartitionDataWriter(PulsarClient client, TopicPartition partition) {
- try {
- this.producer =
- client.newProducer(Schema.STRING).topic(partition.getFullTopicName()).create();
- } catch (PulsarClientException e) {
- throw new IllegalStateException(e);
- }
+ public PulsarPartitionDataWriter(PulsarRuntimeOperator operator, String fullTopicName) {
+ this.operator = operator;
+ this.fullTopicName = fullTopicName;
}
@Override
public void writeRecords(Collection<String> records) {
- for (String record : records) {
- sneakyClient(() -> producer.newMessage().value(record).send());
- }
+ operator.sendMessages(fullTopicName, Schema.STRING, records);
}
@Override
- public void close() throws Exception {
- producer.close();
+ public void close() {
+ // Nothing to do.
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
index a80d721..2ad4c2f 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestContext.java
@@ -29,15 +29,14 @@
/** Common test context for pulsar based test. */
public abstract class PulsarTestContext<T> implements ExternalContext<T> {
+ private static final long serialVersionUID = 4717940854368532130L;
private static final int NUM_RECORDS_UPPER_BOUND = 500;
private static final int NUM_RECORDS_LOWER_BOUND = 100;
- private final String displayName;
protected final PulsarRuntimeOperator operator;
- protected PulsarTestContext(String displayName, PulsarTestEnvironment environment) {
- this.displayName = displayName;
+ protected PulsarTestContext(PulsarTestEnvironment environment) {
this.operator = environment.operator();
}
@@ -58,8 +57,10 @@
return records;
}
+ protected abstract String displayName();
+
@Override
public String toString() {
- return displayName;
+ return displayName();
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
index 7b31c7c..50ca3fe 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestEnvironment.java
@@ -20,7 +20,6 @@
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
import org.apache.flink.connectors.test.common.TestResource;
import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
@@ -54,10 +53,10 @@
public class PulsarTestEnvironment
implements BeforeAllCallback, AfterAllCallback, TestResource, TestRule {
- private final PulsarRuntimeProvider provider;
+ private final PulsarRuntime runtime;
public PulsarTestEnvironment(PulsarRuntime runtime) {
- this.provider = runtime.provider();
+ this.runtime = runtime;
}
/** JUnit 4 Rule based test logic. */
@@ -66,7 +65,7 @@
return new Statement() {
@Override
public void evaluate() throws Throwable {
- provider.startUp();
+ runtime.startUp();
List<Throwable> errors = new ArrayList<>();
try {
@@ -75,7 +74,7 @@
errors.add(t);
} finally {
try {
- provider.tearDown();
+ runtime.tearDown();
} catch (Throwable t) {
errors.add(t);
}
@@ -88,29 +87,29 @@
/** JUnit 5 Extension setup method. */
@Override
public void beforeAll(ExtensionContext context) {
- provider.startUp();
+ runtime.startUp();
}
/** flink-connector-testing setup method. */
@Override
public void startUp() {
- provider.startUp();
+ runtime.startUp();
}
/** JUnit 5 Extension shutdown method. */
@Override
public void afterAll(ExtensionContext context) {
- provider.tearDown();
+ runtime.tearDown();
}
/** flink-connector-testing shutdown method. */
@Override
public void tearDown() {
- provider.tearDown();
+ runtime.tearDown();
}
/** Get a common supported set of method for operating pulsar which is in container. */
public PulsarRuntimeOperator operator() {
- return provider.operator();
+ return runtime.operator();
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
index 2321bd4..18a8655 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/PulsarTestSuiteBase.java
@@ -56,7 +56,7 @@
* pulsar broker. Override this method when needs.
*/
protected PulsarRuntime runtime() {
- return PulsarRuntime.MOCK;
+ return PulsarRuntime.mock();
}
/** Operate pulsar by acquiring a runtime operator. */
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
index 7ce676c..12dbabe 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicConsumingContext.java
@@ -18,96 +18,33 @@
package org.apache.flink.connector.pulsar.testutils.cases;
-import org.apache.flink.api.connector.source.Boundedness;
-import org.apache.flink.api.connector.source.Source;
-import org.apache.flink.connector.pulsar.source.PulsarSource;
-import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
-import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
-import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
-import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
-import org.apache.pulsar.client.api.RegexSubscriptionMode;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
-import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
-import static org.apache.pulsar.client.api.Schema.STRING;
-import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
+import org.apache.pulsar.client.api.SubscriptionType;
/**
* Pulsar external context that will create multiple topics with only one partitions as source
* splits.
*/
-public class MultipleTopicConsumingContext extends PulsarTestContext<String> {
-
- private int numTopics = 0;
-
- private final String topicPattern;
-
- private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters =
- new HashMap<>();
+public class MultipleTopicConsumingContext extends MultipleTopicTemplateContext {
+ private static final long serialVersionUID = -3855336888090886528L;
public MultipleTopicConsumingContext(PulsarTestEnvironment environment) {
- super("consuming message on multiple topic", environment);
- this.topicPattern =
- "pulsar-multiple-topic-[0-9]+-"
- + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
+ super(environment);
}
@Override
- public Source<String, ?, ?> createSource(Boundedness boundedness) {
- PulsarSourceBuilder<String> builder =
- PulsarSource.builder()
- .setDeserializationSchema(pulsarSchema(STRING))
- .setServiceUrl(operator.serviceUrl())
- .setAdminUrl(operator.adminUrl())
- .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics)
- .setSubscriptionType(Exclusive)
- .setSubscriptionName("flink-pulsar-multiple-topic-test");
- if (boundedness == Boundedness.BOUNDED) {
- // Using latest stop cursor for making sure the source could be stopped.
- // This is required for SourceTestSuiteBase.
- builder.setBoundedStopCursor(StopCursor.latest());
- }
-
- return builder.build();
+ protected String displayName() {
+ return "consuming message on multiple topic";
}
@Override
- public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
- String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics));
- operator.createTopic(topicName, 1);
-
- String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
- TopicPartition partition = new TopicPartition(partitionName, 0, createFullRange());
- PulsarPartitionDataWriter writer =
- new PulsarPartitionDataWriter(operator.client(), partition);
-
- topicNameToSplitWriters.put(partitionName, writer);
- numTopics++;
-
- return writer;
+ protected String subscriptionName() {
+ return "flink-pulsar-multiple-topic-test";
}
@Override
- public Collection<String> generateTestData(int splitIndex, long seed) {
- return generateStringTestData(splitIndex, seed);
- }
-
- @Override
- public void close() throws Exception {
- for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) {
- writer.close();
- }
-
- topicNameToSplitWriters.clear();
+ protected SubscriptionType subscriptionType() {
+ return SubscriptionType.Exclusive;
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
new file mode 100644
index 0000000..a0801ec
--- /dev/null
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.pulsar.testutils.cases;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.apache.pulsar.client.api.Schema.STRING;
+
+/**
+ * Pulsar external context template that will create multiple topics with only one partitions as
+ * source splits.
+ */
+public abstract class MultipleTopicTemplateContext extends PulsarTestContext<String> {
+ private static final long serialVersionUID = 7333807392445848344L;
+
+ private int numTopics = 0;
+
+ private final String topicPattern = "pulsar-multiple-topic-[0-9]+-" + randomAlphabetic(8);
+
+ private final Map<String, SourceSplitDataWriter<String>> topicNameToSplitWriters =
+ new HashMap<>();
+
+ public MultipleTopicTemplateContext(PulsarTestEnvironment environment) {
+ super(environment);
+ }
+
+ @Override
+ public Source<String, ?, ?> createSource(Boundedness boundedness) {
+ PulsarSourceBuilder<String> builder =
+ PulsarSource.builder()
+ .setDeserializationSchema(pulsarSchema(STRING))
+ .setServiceUrl(serviceUrl())
+ .setAdminUrl(adminUrl())
+ .setTopicPattern(topicPattern, RegexSubscriptionMode.AllTopics)
+ .setSubscriptionType(subscriptionType())
+ .setSubscriptionName(subscriptionName());
+ if (boundedness == Boundedness.BOUNDED) {
+ // Using latest stop cursor for making sure the source could be stopped.
+ // This is required for SourceTestSuiteBase.
+ builder.setBoundedStopCursor(StopCursor.latest());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+ String topicName = topicPattern.replace("[0-9]+", String.valueOf(numTopics));
+ operator.createTopic(topicName, 1);
+
+ String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
+ PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
+
+ topicNameToSplitWriters.put(partitionName, writer);
+ numTopics++;
+
+ return writer;
+ }
+
+ @Override
+ public Collection<String> generateTestData(int splitIndex, long seed) {
+ return generateStringTestData(splitIndex, seed);
+ }
+
+ @Override
+ public void close() throws Exception {
+ for (SourceSplitDataWriter<String> writer : topicNameToSplitWriters.values()) {
+ writer.close();
+ }
+
+ topicNameToSplitWriters.clear();
+ }
+
+ protected abstract String subscriptionName();
+
+ protected abstract SubscriptionType subscriptionType();
+
+ protected String serviceUrl() {
+ return operator.serviceUrl();
+ }
+
+ protected String adminUrl() {
+ return operator.adminUrl();
+ }
+}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
index cb1b582..b89511c 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
@@ -23,7 +23,7 @@
import org.apache.flink.connector.pulsar.source.PulsarSource;
import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
-import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
@@ -34,7 +34,6 @@
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
-import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.createFullRange;
import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
import static org.apache.pulsar.client.api.Schema.STRING;
import static org.apache.pulsar.client.api.SubscriptionType.Exclusive;
@@ -44,6 +43,7 @@
* source splits.
*/
public class SingleTopicConsumingContext extends PulsarTestContext<String> {
+ private static final long serialVersionUID = 2754642285356345741L;
private static final String TOPIC_NAME_PREFIX = "pulsar-single-topic";
private final String topicName;
@@ -53,12 +53,17 @@
private int numSplits = 0;
public SingleTopicConsumingContext(PulsarTestEnvironment environment) {
- super("consuming message on single topic", environment);
+ super(environment);
this.topicName =
TOPIC_NAME_PREFIX + "-" + ThreadLocalRandom.current().nextLong(Long.MAX_VALUE);
}
@Override
+ protected String displayName() {
+ return "consuming message on single topic";
+ }
+
+ @Override
public Source<String, ?, ?> createSource(Boundedness boundedness) {
PulsarSourceBuilder<String> builder =
PulsarSource.builder()
@@ -88,9 +93,8 @@
operator.increaseTopicPartitions(topicName, numSplits);
}
- TopicPartition partition = new TopicPartition(topicName, numSplits - 1, createFullRange());
- PulsarPartitionDataWriter writer =
- new PulsarPartitionDataWriter(operator.client(), partition);
+ String partitionName = TopicNameUtils.topicNameWithPartition(topicName, numSplits - 1);
+ PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
partitionToSplitWriter.put(numSplits - 1, writer);
return writer;
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
index 986f4bd..d46658e 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntime.java
@@ -18,33 +18,36 @@
package org.apache.flink.connector.pulsar.testutils.runtime;
-import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerProvider;
-import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockProvider;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime;
+import org.apache.flink.connector.pulsar.testutils.runtime.mock.PulsarMockRuntime;
-import java.util.function.Supplier;
+import org.testcontainers.containers.GenericContainer;
/**
- * A enum class for providing a operable pulsar runtime. We support two types of runtime, the
- * container and mock.
+ * An abstraction for different pulsar runtimes. Providing the common methods for {@link
+ * PulsarTestEnvironment}.
*/
-public enum PulsarRuntime {
+public interface PulsarRuntime {
- /**
- * The whole pulsar cluster would run in a docker container, provide the full fledged test
- * backend.
- */
- CONTAINER(PulsarContainerProvider::new),
+ /** Start up this pulsar runtime, block the thread until everytime is ready for this runtime. */
+ void startUp();
- /** The bookkeeper and zookeeper would use a mock backend, and start a single pulsar broker. */
- MOCK(PulsarMockProvider::new);
+ /** Shutdown this pulsar runtime. */
+ void tearDown();
- private final Supplier<PulsarRuntimeProvider> provider;
+ /** Return a operator for operating this pulsar runtime. */
+ PulsarRuntimeOperator operator();
- PulsarRuntime(Supplier<PulsarRuntimeProvider> provider) {
- this.provider = provider;
+ static PulsarRuntime mock() {
+ return new PulsarMockRuntime();
}
- public PulsarRuntimeProvider provider() {
- return provider.get();
+ static PulsarRuntime container() {
+ return new PulsarContainerRuntime();
+ }
+
+ static PulsarRuntime container(GenericContainer<?> flinkContainer) {
+ return new PulsarContainerRuntime().bindWithFlinkContainer(flinkContainer);
}
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
index a68c065..2d26925 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeOperator.java
@@ -24,6 +24,8 @@
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.shaded.guava30.com.google.common.base.Strings;
+
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.MessageId;
@@ -148,13 +150,30 @@
return messageIds.get(0);
}
+ public <T> MessageId sendMessage(String topic, Schema<T> schema, String key, T message) {
+ List<MessageId> messageIds = sendMessages(topic, schema, key, singletonList(message));
+ checkArgument(messageIds.size() == 1);
+
+ return messageIds.get(0);
+ }
+
public <T> List<MessageId> sendMessages(
String topic, Schema<T> schema, Collection<T> messages) {
+ return sendMessages(topic, schema, null, messages);
+ }
+
+ public <T> List<MessageId> sendMessages(
+ String topic, Schema<T> schema, String key, Collection<T> messages) {
try (Producer<T> producer = client().newProducer(schema).topic(topic).create()) {
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (T message : messages) {
- MessageId messageId = producer.newMessage().value(message).send();
+ MessageId messageId;
+ if (Strings.isNullOrEmpty(key)) {
+ messageId = producer.newMessage().value(message).send();
+ } else {
+ messageId = producer.newMessage().key(key).value(message).send();
+ }
messageIds.add(messageId);
}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java
deleted file mode 100644
index d8ad718..0000000
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/PulsarRuntimeProvider.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.pulsar.testutils.runtime;
-
-import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
-
-/**
- * A abstraction for different pulsar runtimes. Providing the common methods for {@link
- * PulsarTestEnvironment}.
- */
-public interface PulsarRuntimeProvider {
-
- /** Start up this pulsar runtime, block the thread until everytime is ready for this runtime. */
- void startUp();
-
- /** Shutdown this pulsar runtime. */
- void tearDown();
-
- /** Return a operator for operating this pulsar runtime. */
- PulsarRuntimeOperator operator();
-}
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
similarity index 75%
rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
index 06be3cb..5560767 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerProvider.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/container/PulsarContainerRuntime.java
@@ -18,13 +18,14 @@
package org.apache.flink.connector.pulsar.testutils.runtime.container;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
import org.apache.flink.util.DockerImageVersions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
+import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
@@ -36,13 +37,22 @@
import static org.apache.flink.util.DockerImageVersions.PULSAR;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.testcontainers.containers.PulsarContainer.BROKER_HTTP_PORT;
+import static org.testcontainers.containers.PulsarContainer.BROKER_PORT;
/**
- * {@link PulsarRuntimeProvider} implementation, use the TestContainers as the backend. We would
- * start a pulsar container by this provider.
+ * {@link PulsarRuntime} implementation, use the TestContainers as the backend. We would start a
+ * pulsar container by this provider.
*/
-public class PulsarContainerProvider implements PulsarRuntimeProvider {
- private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerProvider.class);
+public class PulsarContainerRuntime implements PulsarRuntime {
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarContainerRuntime.class);
+ private static final String PULSAR_INTERNAL_HOSTNAME = "pulsar";
+
+ // This url is used on the container side.
+ public static final String PULSAR_SERVICE_URL =
+ String.format("pulsar://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_PORT);
+ // This url is used on the container side.
+ public static final String PULSAR_ADMIN_URL =
+ String.format("http://%s:%d", PULSAR_INTERNAL_HOSTNAME, BROKER_HTTP_PORT);
/**
* Create a pulsar container provider by a predefined version, this constance {@link
@@ -52,6 +62,14 @@
private PulsarRuntimeOperator operator;
+ public PulsarContainerRuntime bindWithFlinkContainer(GenericContainer<?> flinkContainer) {
+ this.container
+ .withNetworkAliases(PULSAR_INTERNAL_HOSTNAME)
+ .dependsOn(flinkContainer)
+ .withNetwork(flinkContainer.getNetwork());
+ return this;
+ }
+
@Override
public void startUp() {
// Prepare Pulsar Container.
diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
similarity index 76%
rename from flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java
rename to flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
index 1cb8ce9..552ce42 100644
--- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockProvider.java
+++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/runtime/mock/PulsarMockRuntime.java
@@ -18,8 +18,8 @@
package org.apache.flink.connector.pulsar.testutils.runtime.mock;
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime;
import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
-import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeProvider;
import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
@@ -39,17 +39,17 @@
import static org.apache.flink.util.Preconditions.checkNotNull;
/** Providing a mocked pulsar server. */
-public class PulsarMockProvider implements PulsarRuntimeProvider {
+public class PulsarMockRuntime implements PulsarRuntime {
private static final String CLUSTER_NAME = "mock-pulsar-" + randomAlphanumeric(6);
private final MockPulsarService pulsarService;
private PulsarRuntimeOperator operator;
- public PulsarMockProvider() {
+ public PulsarMockRuntime() {
this(createConfig());
}
- public PulsarMockProvider(ServiceConfiguration configuration) {
+ public PulsarMockRuntime(ServiceConfiguration configuration) {
this.pulsarService = new MockPulsarService(configuration);
}
@@ -95,20 +95,34 @@
admin.clusters().createCluster(CLUSTER_NAME, data);
}
+ createOrUpdateTenant("public");
+ createOrUpdateNamespace("public", "default");
+
+ createOrUpdateTenant("pulsar");
+ createOrUpdateNamespace("pulsar", "system");
+ }
+
+ private void createOrUpdateTenant(String tenant) throws PulsarAdminException {
+ PulsarAdmin admin = operator().admin();
TenantInfo info =
TenantInfo.builder()
.adminRoles(ImmutableSet.of("appid1", "appid2"))
.allowedClusters(ImmutableSet.of(CLUSTER_NAME))
.build();
- if (!admin.tenants().getTenants().contains("public")) {
- admin.tenants().createTenant("public", info);
+ if (!admin.tenants().getTenants().contains(tenant)) {
+ admin.tenants().createTenant(tenant, info);
} else {
- admin.tenants().updateTenant("public", info);
+ admin.tenants().updateTenant(tenant, info);
}
+ }
- if (!admin.namespaces().getNamespaces("public").contains("public/default")) {
- admin.namespaces().createNamespace("public/default");
- admin.namespaces().setRetention("public/default", new RetentionPolicies(60, 1000));
+ public void createOrUpdateNamespace(String tenant, String namespace)
+ throws PulsarAdminException {
+ PulsarAdmin admin = operator().admin();
+ String namespaceValue = tenant + "/" + namespace;
+ if (!admin.namespaces().getNamespaces(tenant).contains(namespaceValue)) {
+ admin.namespaces().createNamespace(namespaceValue);
+ admin.namespaces().setRetention(namespaceValue, new RetentionPolicies(60, 1000));
}
}
@@ -135,6 +149,13 @@
configuration.setBrokerServicePort(Optional.of(findAvailablePort()));
configuration.setWebServicePort(Optional.of(findAvailablePort()));
+ // Enable transaction with in memory.
+ configuration.setTransactionCoordinatorEnabled(true);
+ configuration.setTransactionMetadataStoreProviderClassName(
+ "org.apache.pulsar.transaction.coordinator.impl.InMemTransactionMetadataStoreProvider");
+ configuration.setTransactionBufferProviderClassName(
+ "org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider");
+
return configuration;
}
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
index 070bd82..9630003 100644
--- a/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
@@ -42,10 +42,7 @@
public FlinkContainerTestEnvironment(
int numTaskManagers, int numSlotsPerTaskManager, String... jarPath) {
- Configuration flinkConfiguration = new Configuration();
- flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L);
- flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L);
- flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L);
+ Configuration flinkConfiguration = flinkConfiguration();
flinkConfiguration.set(NUM_TASK_SLOTS, numSlotsPerTaskManager);
this.flinkContainer =
@@ -113,4 +110,13 @@
public FlinkContainer getFlinkContainer() {
return this.flinkContainer;
}
+
+ protected Configuration flinkConfiguration() {
+ Configuration flinkConfiguration = new Configuration();
+ flinkConfiguration.set(HEARTBEAT_INTERVAL, 1000L);
+ flinkConfiguration.set(HEARTBEAT_TIMEOUT, 5000L);
+ flinkConfiguration.set(SLOT_REQUEST_TIMEOUT, 10000L);
+
+ return flinkConfiguration;
+ }
}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
new file mode 100644
index 0000000..269f89c
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/pom.xml
@@ -0,0 +1,121 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>flink-end-to-end-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>1.15-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>flink-end-to-end-tests-pulsar</artifactId>
+ <name>Flink : E2E Tests : Pulsar</name>
+
+ <properties>
+ <pulsar.version>2.8.0</pulsar.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-end-to-end-tests-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>pulsar</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-dependency-plugin</artifactId>
+ <executions>
+ <execution>
+ <id>copy</id>
+ <phase>pre-integration-test</phase>
+ <goals>
+ <goal>copy</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-connector-pulsar_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <destFileName>pulsar-connector.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-all</artifactId>
+ <version>${pulsar.version}</version>
+ <destFileName>pulsar-client-all.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-api</artifactId>
+ <version>${pulsar.version}</version>
+ <destFileName>pulsar-client-api.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-client-admin-api</artifactId>
+ <version>${pulsar.version}</version>
+ <destFileName>pulsar-admin-api.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${slf4j.version}</version>
+ <destFileName>jul-to-slf4j.jar</destFileName>
+ <type>jar</type>
+ <outputDirectory>${project.build.directory}/dependencies</outputDirectory>
+ </artifactItem>
+ </artifactItems>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
new file mode 100644
index 0000000..1427c2b
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.connectors.test.common.testsuites.SourceTestSuiteBase;
+import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/**
+ * Pulsar E2E test based on connector testing framework. It's used for Failover & Exclusive
+ * subscription.
+ */
+public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
+
+ // Defines TestEnvironment.
+ @TestEnv
+ FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 6);
+
+ // Defines ConnectorExternalSystem.
+ @ExternalSystem
+ PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainer()));
+
+ // Defines a set of external context Factories for different test cases.
+ @ExternalContextFactory
+ PulsarTestContextFactory<String, ExclusiveSubscriptionContext> exclusive =
+ new PulsarTestContextFactory<>(pulsar, ExclusiveSubscriptionContext::new);
+
+ @ExternalContextFactory
+ PulsarTestContextFactory<String, FailoverSubscriptionContext> failover =
+ new PulsarTestContextFactory<>(pulsar, FailoverSubscriptionContext::new);
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
new file mode 100644
index 0000000..25cab21
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContextFactory;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalContextFactory;
+import org.apache.flink.connectors.test.common.junit.annotations.ExternalSystem;
+import org.apache.flink.connectors.test.common.junit.annotations.TestEnv;
+import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
+import org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
+import org.apache.flink.tests.util.pulsar.common.UnorderedSourceTestSuiteBase;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.container;
+
+/**
+ * Pulsar E2E test based on connector testing framework. It's used for Shared & Key_Shared
+ * subscription.
+ */
+public class PulsarSourceUnorderedE2ECase extends UnorderedSourceTestSuiteBase<String> {
+
+ // Defines TestEnvironment.
+ @TestEnv
+ FlinkContainerWithPulsarEnvironment flink = new FlinkContainerWithPulsarEnvironment(1, 8);
+
+ // Defines ConnectorExternalSystem.
+ @ExternalSystem
+ PulsarTestEnvironment pulsar = new PulsarTestEnvironment(container(flink.getFlinkContainer()));
+
+ // Defines a set of external context Factories for different test cases.
+ @ExternalContextFactory
+ PulsarTestContextFactory<String, SharedSubscriptionContext> shared =
+ new PulsarTestContextFactory<>(pulsar, SharedSubscriptionContext::new);
+
+ @ExternalContextFactory
+ PulsarTestContextFactory<String, KeySharedSubscriptionContext> keyShared =
+ new PulsarTestContextFactory<>(pulsar, KeySharedSubscriptionContext::new);
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
new file mode 100644
index 0000000..18b2ffc
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/ExclusiveSubscriptionContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+
+/** We would consuming from test splits by using {@link SubscriptionType#Exclusive} subscription. */
+public class ExclusiveSubscriptionContext extends MultipleTopicTemplateContext {
+ private static final long serialVersionUID = 6238209089442257487L;
+
+ public ExclusiveSubscriptionContext(PulsarTestEnvironment environment) {
+ super(environment);
+ }
+
+ @Override
+ protected String displayName() {
+ return "consuming message by Exclusive";
+ }
+
+ @Override
+ protected String subscriptionName() {
+ return "pulsar-exclusive-subscription";
+ }
+
+ @Override
+ protected SubscriptionType subscriptionType() {
+ return SubscriptionType.Exclusive;
+ }
+
+ @Override
+ protected String serviceUrl() {
+ return PULSAR_SERVICE_URL;
+ }
+
+ @Override
+ protected String adminUrl() {
+ return PULSAR_ADMIN_URL;
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
new file mode 100644
index 0000000..c322efa
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/FailoverSubscriptionContext.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connector.pulsar.testutils.cases.MultipleTopicTemplateContext;
+
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+
+/** We would consuming from test splits by using {@link SubscriptionType#Failover} subscription. */
+public class FailoverSubscriptionContext extends MultipleTopicTemplateContext {
+ private static final long serialVersionUID = 6238209089442257487L;
+
+ public FailoverSubscriptionContext(PulsarTestEnvironment environment) {
+ super(environment);
+ }
+
+ @Override
+ protected String displayName() {
+ return "consuming message by Failover";
+ }
+
+ @Override
+ protected String subscriptionName() {
+ return "pulsar-failover-subscription";
+ }
+
+ @Override
+ protected SubscriptionType subscriptionType() {
+ return SubscriptionType.Failover;
+ }
+
+ @Override
+ protected String serviceUrl() {
+ return PULSAR_SERVICE_URL;
+ }
+
+ @Override
+ protected String adminUrl() {
+ return PULSAR_ADMIN_URL;
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
new file mode 100644
index 0000000..e442418
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.range.FixedRangeGenerator;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.tests.util.pulsar.common.KeyedPulsarPartitionDataWriter;
+
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.util.Murmur3_32Hash;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.Collections.singletonList;
+import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
+import static org.apache.flink.connector.pulsar.source.enumerator.topic.TopicRange.RANGE_SIZE;
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+import static org.apache.pulsar.client.api.Schema.STRING;
+
+/**
+ * We would consuming from test splits by using {@link SubscriptionType#Key_Shared} subscription.
+ */
+public class KeySharedSubscriptionContext extends PulsarTestContext<String> {
+ private static final long serialVersionUID = 3246516520107893983L;
+
+ private int index = 0;
+
+ private final List<KeyedPulsarPartitionDataWriter> writers = new ArrayList<>();
+
+ // Message keys.
+ private final String key1;
+ private final String key2;
+
+ public KeySharedSubscriptionContext(PulsarTestEnvironment environment) {
+ super(environment);
+
+ // Init message keys.
+ this.key1 = randomAlphabetic(8);
+ String newKey2;
+ do {
+ newKey2 = randomAlphabetic(8);
+ } while (keyHash(key1) == keyHash(newKey2));
+ this.key2 = newKey2;
+ }
+
+ @Override
+ protected String displayName() {
+ return "consuming message by Key_Shared";
+ }
+
+ @Override
+ public Source<String, ?, ?> createSource(Boundedness boundedness) {
+ int keyHash = keyHash(key1);
+ TopicRange range = new TopicRange(keyHash, keyHash);
+
+ PulsarSourceBuilder<String> builder =
+ PulsarSource.builder()
+ .setDeserializationSchema(pulsarSchema(STRING))
+ .setServiceUrl(PULSAR_SERVICE_URL)
+ .setAdminUrl(PULSAR_ADMIN_URL)
+ .setTopicPattern(
+ "pulsar-[0-9]+-key-shared", RegexSubscriptionMode.AllTopics)
+ .setSubscriptionType(SubscriptionType.Key_Shared)
+ .setSubscriptionName("pulsar-key-shared")
+ .setRangeGenerator(new FixedRangeGenerator(singletonList(range)));
+ if (boundedness == Boundedness.BOUNDED) {
+ // Using latest stop cursor for making sure the source could be stopped.
+ builder.setBoundedStopCursor(StopCursor.latest());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+ String topicName = "pulsar-" + index + "-key-shared";
+ operator.createTopic(topicName, 1);
+ index++;
+
+ String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
+ KeyedPulsarPartitionDataWriter writer =
+ new KeyedPulsarPartitionDataWriter(operator, partitionName, key1, key2);
+ writers.add(writer);
+
+ return writer;
+ }
+
+ @Override
+ public Collection<String> generateTestData(int splitIndex, long seed) {
+ return generateStringTestData(splitIndex, seed);
+ }
+
+ @Override
+ public void close() {
+ for (KeyedPulsarPartitionDataWriter writer : writers) {
+ writer.close();
+ }
+ writers.clear();
+ }
+
+ private int keyHash(String key) {
+ return Murmur3_32Hash.getInstance().makeHash(key.getBytes()) % RANGE_SIZE;
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
new file mode 100644
index 0000000..f936b6f
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.cases;
+
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connector.pulsar.source.PulsarSource;
+import org.apache.flink.connector.pulsar.source.PulsarSourceBuilder;
+import org.apache.flink.connector.pulsar.source.enumerator.cursor.StopCursor;
+import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicNameUtils;
+import org.apache.flink.connector.pulsar.testutils.PulsarPartitionDataWriter;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
+import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.SubscriptionType;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_ADMIN_URL;
+import static org.apache.flink.connector.pulsar.testutils.runtime.container.PulsarContainerRuntime.PULSAR_SERVICE_URL;
+import static org.apache.pulsar.client.api.Schema.STRING;
+
+/** We would consuming from test splits by using {@link SubscriptionType#Shared} subscription. */
+public class SharedSubscriptionContext extends PulsarTestContext<String> {
+ private static final long serialVersionUID = -2798707923661295245L;
+
+ private int index = 0;
+
+ private final List<PulsarPartitionDataWriter> writers = new ArrayList<>();
+
+ public SharedSubscriptionContext(PulsarTestEnvironment environment) {
+ super(environment);
+ }
+
+ @Override
+ protected String displayName() {
+ return "consuming message by Shared";
+ }
+
+ @Override
+ public Source<String, ?, ?> createSource(Boundedness boundedness) {
+ PulsarSourceBuilder<String> builder =
+ PulsarSource.builder()
+ .setDeserializationSchema(pulsarSchema(STRING))
+ .setServiceUrl(PULSAR_SERVICE_URL)
+ .setAdminUrl(PULSAR_ADMIN_URL)
+ .setTopicPattern("pulsar-[0-9]+-shared", RegexSubscriptionMode.AllTopics)
+ .setSubscriptionType(SubscriptionType.Shared)
+ .setSubscriptionName("pulsar-shared");
+ if (boundedness == Boundedness.BOUNDED) {
+ // Using latest stop cursor for making sure the source could be stopped.
+ builder.setBoundedStopCursor(StopCursor.latest());
+ }
+
+ return builder.build();
+ }
+
+ @Override
+ public SourceSplitDataWriter<String> createSourceSplitDataWriter() {
+ String topicName = "pulsar-" + index + "-shared";
+ operator.createTopic(topicName, 1);
+ index++;
+
+ String partitionName = TopicNameUtils.topicNameWithPartition(topicName, 0);
+ PulsarPartitionDataWriter writer = new PulsarPartitionDataWriter(operator, partitionName);
+ writers.add(writer);
+
+ return writer;
+ }
+
+ @Override
+ public Collection<String> generateTestData(int splitIndex, long seed) {
+ return generateStringTestData(splitIndex, seed);
+ }
+
+ @Override
+ public void close() {
+ for (PulsarPartitionDataWriter writer : writers) {
+ writer.close();
+ }
+ writers.clear();
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
new file mode 100644
index 0000000..890d09e
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/FlinkContainerWithPulsarEnvironment.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.tests.util.TestUtils;
+import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
+
+import static org.apache.flink.configuration.TaskManagerOptions.TASK_OFF_HEAP_MEMORY;
+
+/** A Flink Container which would bundles pulsar connector in its classpath. */
+public class FlinkContainerWithPulsarEnvironment extends FlinkContainerTestEnvironment {
+
+ public FlinkContainerWithPulsarEnvironment(int numTaskManagers, int numSlotsPerTaskManager) {
+ super(
+ numTaskManagers,
+ numSlotsPerTaskManager,
+ resourcePath("pulsar-connector.jar"),
+ resourcePath("pulsar-client-all.jar"),
+ resourcePath("pulsar-client-api.jar"),
+ resourcePath("pulsar-admin-api.jar"),
+ resourcePath("jul-to-slf4j.jar"));
+ }
+
+ private static String resourcePath(String jarName) {
+ return TestUtils.getResource(jarName).toAbsolutePath().toString();
+ }
+
+ @Override
+ protected Configuration flinkConfiguration() {
+ Configuration configuration = super.flinkConfiguration();
+ // Increase the off heap memory for avoiding direct buffer memory error on Pulsar e2e tests.
+ configuration.set(TASK_OFF_HEAP_MEMORY, MemorySize.ofMebiBytes(100));
+
+ return configuration;
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
new file mode 100644
index 0000000..eea97e6
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/KeyedPulsarPartitionDataWriter.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntimeOperator;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+
+import org.apache.pulsar.client.api.Schema;
+
+import java.util.Collection;
+import java.util.List;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Source split data writer for writing test data into a Pulsar topic partition. It will write the
+ * message with two keys.
+ */
+public class KeyedPulsarPartitionDataWriter implements SourceSplitDataWriter<String> {
+
+ private final PulsarRuntimeOperator operator;
+ private final String fullTopicName;
+ private final String key1;
+ private final String key2;
+
+ public KeyedPulsarPartitionDataWriter(
+ PulsarRuntimeOperator operator, String fullTopicName, String key1, String key2) {
+ this.operator = operator;
+ this.fullTopicName = fullTopicName;
+ this.key1 = key1;
+ this.key2 = key2;
+ }
+
+ @Override
+ public void writeRecords(Collection<String> records) {
+ operator.sendMessages(fullTopicName, Schema.STRING, key1, records);
+
+ List<String> newRecords = records.stream().map(a -> a + key1).collect(toList());
+ operator.sendMessages(fullTopicName, Schema.STRING, key2, newRecords);
+ }
+
+ @Override
+ public void close() {
+ // Nothing to do.
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
new file mode 100644
index 0000000..c452fe6
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/common/UnorderedSourceTestSuiteBase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.tests.util.pulsar.common;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
+import org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider;
+import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+/** A source test template for testing the messages which could be consumed in a unordered way. */
+@ExtendWith({
+ ConnectorTestingExtension.class,
+ TestLoggerExtension.class,
+ TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public abstract class UnorderedSourceTestSuiteBase<T> {
+
+ @TestTemplate
+ @DisplayName("Test source with one split and four consumers")
+ public void testOneSplitWithMultipleConsumers(
+ TestEnvironment testEnv, ExternalContext<T> externalContext) throws Exception {
+ Collection<T> testData =
+ externalContext.generateTestData(0, ThreadLocalRandom.current().nextLong());
+ SourceSplitDataWriter<T> writer = externalContext.createSourceSplitDataWriter();
+ writer.writeRecords(testData);
+
+ Source<T, ?, ?> source = externalContext.createSource(Boundedness.BOUNDED);
+ StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment();
+ List<T> results =
+ execEnv.fromSource(source, WatermarkStrategy.noWatermarks(), "Pulsar source")
+ .setParallelism(4)
+ .executeAndCollect(
+ "Source single split with four readers.", testData.size());
+
+ assertThat(results, containsInAnyOrder(testData.toArray()));
+ }
+}
diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties
new file mode 100644
index 0000000..835c2ec
--- /dev/null
+++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c %x - %m%n
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 8aada21..84ca52e 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -88,6 +88,7 @@
<module>flink-python-test</module>
<module>flink-end-to-end-tests-hbase</module>
<module>flink-glue-schema-registry-test</module>
+ <module>flink-end-to-end-tests-pulsar</module>
</modules>
<dependencyManagement>
diff --git a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
index 521d055..110ba08 100644
--- a/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
+++ b/tools/ci/java-ci-tools/src/main/resources/modules-skipping-deployment.modulelist
@@ -39,3 +39,4 @@
flink-elasticsearch5-test
flink-high-parallelism-iterations-test
flink-end-to-end-tests-common-kafka
+flink-end-to-end-tests-pulsar