[documentation][example] Flink Source & Sink Connector (#2561)

### Motivation

We added flink source connector (#2441) and sink connector (#2434). It would be great to an example to show how to use flink source & sink connector.

### Modifications

- introduce an `examples` module
- introduce an `examples/flink-consumer-source` module
- add a word count example to use flink source and sink connector

### Result

be able to know how to use flink source & sink connector
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index bddfee4..2324c55 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -97,10 +97,7 @@
     /**
      * The callback than handles error propagation or logging callbacks.
      */
-    protected transient Function<MessageId, MessageId> successCallback = msgId -> {
-        acknowledgeMessage();
-        return msgId;
-    };
+    protected transient Function<MessageId, MessageId> successCallback;
 
     protected transient Function<Throwable, MessageId> failureCallback;
 
@@ -205,6 +202,11 @@
             flushOnCheckpoint = false;
         }
 
+        this.successCallback =  msgId -> {
+            acknowledgeMessage();
+            return msgId;
+        };
+
         if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
             this.failureCallback = cause -> {
                 LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index f1b2595..0d01def 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -127,7 +127,6 @@
         while (isRunning) {
             message = consumer.receive(messageReceiveTimeoutMs, TimeUnit.MILLISECONDS);
             if (message == null) {
-                LOG.info("unexpected null message");
                 continue;
             }
 
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
index 90dc21c..270892e 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
@@ -18,10 +18,12 @@
  */
 package org.apache.flink.streaming.connectors.pulsar.partitioner;
 
+import java.io.Serializable;
+
 /**
  * Extract key from a value.
  */
-public interface PulsarKeyExtractor<IN> {
+public interface PulsarKeyExtractor<IN> extends Serializable {
 
     PulsarKeyExtractor NULL = in -> null;