Add missed serialization schema argument for PulsarOutputFormat constructor (#4373)
The constructor of PulsarOutputFormat expects a serializationSchema passed in otherwise it's member serializationSchema will be assigned by itself.
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index c2fb984..cb307f0 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -37,7 +37,9 @@
this.serializationSchema = serializationSchema;
}
- public PulsarOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+ public PulsarOutputFormat(final ClientConfigurationData clientConfigurationData,
+ final ProducerConfigurationData producerConfigurationData,
+ final SerializationSchema<T> serializationSchema) {
super(clientConfigurationData, producerConfigurationData);
Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
this.serializationSchema = serializationSchema;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index a841067..9c9a2fe 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -72,7 +72,7 @@
.topicName("testTopic")
.build();
- new PulsarOutputFormat(clientConf, producerConf);
+ new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -85,7 +85,7 @@
.topicName(null)
.build();
- new PulsarOutputFormat(clientConf, producerConf);
+ new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -98,7 +98,7 @@
.topicName(StringUtils.EMPTY)
.build();
- new PulsarOutputFormat(clientConf, producerConf);
+ new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
}
@Test(expectedExceptions = IllegalArgumentException.class)
@@ -111,7 +111,18 @@
.topicName("testTopic")
.build();
- new PulsarOutputFormat(clientConf, producerConf);
+ new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
+ }
+
+ @Test(expectedExceptions = NullPointerException.class)
+ public void testPulsarOutputFormatConstructorV2WhenSerializationSchemaIsNull() {
+ ClientConfigurationData clientConf = ClientConfigurationData.builder()
+ .serviceUrl("testServiceUrl")
+ .build();
+ ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
+ .topicName("testTopic")
+ .build();
+ new PulsarOutputFormat(clientConf, producerConf, null);
}
@Test