[documentation][example] Flink Source & Sink Connector (#2561)
### Motivation
We added flink source connector (#2441) and sink connector (#2434). It would be great to an example to show how to use flink source & sink connector.
### Modifications
- introduce an `examples` module
- introduce an `examples/flink-consumer-source` module
- add a word count example to use flink source and sink connector
### Result
be able to know how to use flink source & sink connector
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index bddfee4..2324c55 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -97,10 +97,7 @@
/**
* The callback than handles error propagation or logging callbacks.
*/
- protected transient Function<MessageId, MessageId> successCallback = msgId -> {
- acknowledgeMessage();
- return msgId;
- };
+ protected transient Function<MessageId, MessageId> successCallback;
protected transient Function<Throwable, MessageId> failureCallback;
@@ -205,6 +202,11 @@
flushOnCheckpoint = false;
}
+ this.successCallback = msgId -> {
+ acknowledgeMessage();
+ return msgId;
+ };
+
if (PulsarProduceMode.AT_MOST_ONCE == produceMode) {
this.failureCallback = cause -> {
LOG.error("Error while sending record to Pulsar : " + cause.getMessage(), cause);
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index f1b2595..0d01def 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -127,7 +127,6 @@
while (isRunning) {
message = consumer.receive(messageReceiveTimeoutMs, TimeUnit.MILLISECONDS);
if (message == null) {
- LOG.info("unexpected null message");
continue;
}
diff --git a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
index 90dc21c..270892e 100644
--- a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
+++ b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/partitioner/PulsarKeyExtractor.java
@@ -18,10 +18,12 @@
*/
package org.apache.flink.streaming.connectors.pulsar.partitioner;
+import java.io.Serializable;
+
/**
* Extract key from a value.
*/
-public interface PulsarKeyExtractor<IN> {
+public interface PulsarKeyExtractor<IN> extends Serializable {
PulsarKeyExtractor NULL = in -> null;