Add missed serialization schema argument for PulsarOutputFormat constructor (#4373)

The constructor of PulsarOutputFormat expects a serializationSchema passed in otherwise it's member serializationSchema will be assigned by itself. 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
index c2fb984..cb307f0 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormat.java
@@ -37,7 +37,9 @@
         this.serializationSchema = serializationSchema;
     }
 
-    public PulsarOutputFormat(ClientConfigurationData clientConfigurationData, ProducerConfigurationData producerConfigurationData) {
+    public PulsarOutputFormat(final ClientConfigurationData clientConfigurationData,
+                              final ProducerConfigurationData producerConfigurationData,
+                              final SerializationSchema<T> serializationSchema) {
         super(clientConfigurationData, producerConfigurationData);
         Preconditions.checkNotNull(serializationSchema, "serializationSchema cannot be null.");
         this.serializationSchema = serializationSchema;
diff --git a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
index a841067..9c9a2fe 100644
--- a/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
+++ b/pulsar-flink/src/test/java/org/apache/flink/batch/connectors/pulsar/PulsarOutputFormatTest.java
@@ -72,7 +72,7 @@
                 .topicName("testTopic")
                 .build();
 
-        new PulsarOutputFormat(clientConf, producerConf);
+        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -85,7 +85,7 @@
                 .topicName(null)
                 .build();
 
-        new PulsarOutputFormat(clientConf, producerConf);
+        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -98,7 +98,7 @@
                 .topicName(StringUtils.EMPTY)
                 .build();
 
-        new PulsarOutputFormat(clientConf, producerConf);
+        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
@@ -111,7 +111,18 @@
                 .topicName("testTopic")
                 .build();
 
-        new PulsarOutputFormat(clientConf, producerConf);
+        new PulsarOutputFormat(clientConf, producerConf, text -> text.toString().getBytes());
+    }
+
+    @Test(expectedExceptions = NullPointerException.class)
+    public void testPulsarOutputFormatConstructorV2WhenSerializationSchemaIsNull() {
+        ClientConfigurationData clientConf = ClientConfigurationData.builder()
+                .serviceUrl("testServiceUrl")
+                .build();
+        ProducerConfigurationData producerConf = ProducerConfigurationData.builder()
+                .topicName("testTopic")
+                .build();
+        new PulsarOutputFormat(clientConf, producerConf, null);
     }
 
     @Test