Add a source connector for MongoDB (#5316)

* Add a source connector for MongoDB

* Fix expected license header
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 1487b57..de09599 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -33,7 +33,7 @@
   <name>Pulsar IO :: MongoDB</name>
 
   <properties>
-    <mongo-driver.version>3.8.2</mongo-driver.version>
+    <mongo-reactivestreams.version>1.12.0</mongo-reactivestreams.version>
   </properties>
 
   <dependencies>
@@ -44,8 +44,8 @@
     </dependency>
     <dependency>
       <groupId>org.mongodb</groupId>
-      <artifactId>mongodb-driver-async</artifactId>
-      <version>${mongo-driver.version}</version>
+      <artifactId>mongodb-driver-reactivestreams</artifactId>
+      <version>${mongo-reactivestreams.version}</version>
     </dependency>
     <dependency>
       <groupId>org.apache.commons</groupId>
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
index 602c105..7585de1 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
@@ -32,7 +32,7 @@
 import java.util.Map;
 
 /**
- * Configuration class for the MongoDB Sink Connector.
+ * Configuration class for the MongoDB Connectors.
  */
 @Data
 @Setter
@@ -57,30 +57,28 @@
     private String mongoUri;
 
     @FieldDoc(
-        required = true,
         defaultValue = "",
-        help = "The name of the database to which the collection belongs to"
+        help = "The database name to which the collection belongs and which must be watched for the source connector"
+                + " (required for the sink connector)"
     )
     private String database;
 
     @FieldDoc(
-        required = true,
         defaultValue = "",
-        help = "The collection name that the connector writes messages to"
+        help = "The collection name where the messages are written or which is watched for the source connector"
+                + " (required for the sink connector)"
     )
     private String collection;
 
     @FieldDoc(
-        required = false,
         defaultValue = "" + DEFAULT_BATCH_SIZE,
-        help = "The batch size of write to the collection"
+        help = "The batch size of write to or read from the database"
     )
     private int batchSize = DEFAULT_BATCH_SIZE;
 
     @FieldDoc(
-            required = false,
-            defaultValue = "" + DEFAULT_BATCH_TIME_MS,
-            help = "The batch operation interval in milliseconds")
+        defaultValue = "" + DEFAULT_BATCH_TIME_MS,
+        help = "The batch operation interval in milliseconds")
     private long batchTimeMs = DEFAULT_BATCH_TIME_MS;
 
 
@@ -98,12 +96,15 @@
         return cfg;
     }
 
-    public void validate() {
-        if (StringUtils.isEmpty(mongoUri) || StringUtils.isEmpty(database) || StringUtils.isEmpty(collection)) {
+    public void validate(boolean dbRequired, boolean collectionRequired) {
+        if (StringUtils.isEmpty(getMongoUri()) ||
+                (dbRequired && StringUtils.isEmpty(getDatabase())) ||
+                (collectionRequired && StringUtils.isEmpty(getCollection()))) {
+
             throw new IllegalArgumentException("Required property not set.");
         }
 
-        Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
-        Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
+        Preconditions.checkArgument(getBatchSize() > 0, "batchSize must be a positive integer.");
+        Preconditions.checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long.");
     }
 }
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 7dc665d..2bfcc33 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -18,14 +18,21 @@
  */
 package org.apache.pulsar.io.mongodb;
 
-import static java.util.stream.Collectors.toList;
-
 import com.google.common.collect.Lists;
 import com.mongodb.MongoBulkWriteException;
 import com.mongodb.async.client.MongoClient;
 import com.mongodb.async.client.MongoClients;
 import com.mongodb.async.client.MongoCollection;
 import com.mongodb.async.client.MongoDatabase;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
 
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -38,16 +45,7 @@
 import java.util.function.Supplier;
 import java.util.stream.IntStream;
 
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.bson.BSONException;
-import org.bson.Document;
-import org.bson.json.JsonParseException;
+import static java.util.stream.Collectors.toList;
 
 /**
  * The base class for MongoDB sinks.
@@ -88,7 +86,7 @@
         log.info("Open MongoDB Sink");
 
         mongoConfig = MongoConfig.load(config);
-        mongoConfig.validate();
+        mongoConfig.validate(true, true);
 
         if (clientProvider != null) {
             mongoClient = clientProvider.get();
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
new file mode 100644
index 0000000..2ce1475
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.FullDocument;
+import com.mongodb.reactivestreams.client.*;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.Document;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * The base class for MongoDB sources.
+ */
+@Connector(
+        name = "mongo",
+        type = IOType.SOURCE,
+        help = "A source connector that sends mongodb documents to pulsar",
+        configClass = MongoConfig.class
+)
+@Slf4j
+public class MongoSource extends PushSource<byte[]> {
+
+    private final Supplier<MongoClient> clientProvider;
+
+    private MongoConfig mongoConfig;
+
+    private MongoClient mongoClient;
+
+    private Thread streamThread;
+
+    private ChangeStreamPublisher<Document> stream;
+
+
+    public MongoSource() {
+        this(null);
+    }
+
+    public MongoSource(Supplier<MongoClient> clientProvider) {
+        this.clientProvider = clientProvider;
+    }
+
+    @Override
+    public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+        log.info("Open MongoDB Source");
+
+        mongoConfig = MongoConfig.load(config);
+        mongoConfig.validate(false, false);
+
+        if (clientProvider != null) {
+            mongoClient = clientProvider.get();
+        } else {
+            mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+        }
+
+        if (StringUtils.isEmpty(mongoConfig.getDatabase())) {
+            // Watch all databases
+            log.info("Watch all");
+            stream = mongoClient.watch();
+
+        } else {
+            final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
+
+            if (StringUtils.isEmpty(mongoConfig.getCollection())) {
+                // Watch all collections in a database
+                log.info("Watch db: {}", db.getName());
+                stream = db.watch();
+
+            } else {
+                // Watch a collection
+
+                final MongoCollection<Document> collection = db.getCollection(mongoConfig.getCollection());
+                log.info("Watch collection: {} {}", db.getName(), mongoConfig.getCollection());
+                stream = collection.watch();
+            }
+        }
+
+        stream.batchSize(mongoConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP);
+
+        stream.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
+            private ObjectMapper mapper = new ObjectMapper();
+            private Subscription subscription;
+
+            @Override
+            public void onSubscribe(Subscription subscription) {
+                this.subscription = subscription;
+                this.subscription.request(Integer.MAX_VALUE);
+            }
+
+            @Override
+            public void onNext(ChangeStreamDocument<Document> doc) {
+                try {
+                    log.info("New change doc: {}", doc);
+
+                    // Build a record with the essential information
+                    final Map<String, Object> recordValue = new HashMap<>();
+                    recordValue.put("fullDocument", doc.getFullDocument());
+                    recordValue.put("ns", doc.getNamespace());
+                    recordValue.put("operation", doc.getOperationType());
+
+                    consume(new DocRecord(
+                            Optional.of(doc.getDocumentKey().toJson()),
+                            mapper.writeValueAsString(recordValue).getBytes("UTF-8")));
+
+                } catch (UnsupportedEncodingException | JsonProcessingException e) {
+                    log.error("Processing doc from mongo", e);
+                }
+            }
+
+            @Override
+            public void onError(Throwable error) {
+                log.error("Subscriber error", error);
+            }
+
+            @Override
+            public void onComplete() {
+                log.info("Subscriber complete");
+            }
+        });
+
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (mongoClient != null) {
+            mongoClient.close();
+        }
+    }
+
+    @Data
+    private static class DocRecord implements Record<byte[]> {
+        private final Optional<String> key;
+        private final byte[] value;
+    }
+}
diff --git a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
index a1cf293..bc32292 100644
--- a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,5 +17,6 @@
 # under the License.
 #
 name: mongo
-description: Writes data into MongoDB
+description: MongoDB source and sink connector
 sinkClass: org.apache.pulsar.io.mongodb.MongoSink
+sourceClass: org.apache.pulsar.io.mongodb.MongoSource
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
index 9f98ee4..8495d87 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
@@ -50,7 +50,7 @@
         final Map<String, Object> map = TestHelper.createMap(false);
         final MongoConfig cfg = MongoConfig.load(map);
 
-        cfg.validate();
+        cfg.validate(true, true);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
@@ -60,7 +60,7 @@
         map.put("batchSize", 0);
         final MongoConfig cfg = MongoConfig.load(map);
 
-        cfg.validate();
+        cfg.validate(true, true);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
@@ -70,7 +70,7 @@
         map.put("batchTimeMs", 0);
         final MongoConfig cfg = MongoConfig.load(map);
 
-        cfg.validate();
+        cfg.validate(true, true);
     }
 
     @Test
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
new file mode 100644
index 0000000..63e07e2
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.OperationType;
+import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
+import com.mongodb.reactivestreams.client.MongoClient;
+import com.mongodb.reactivestreams.client.MongoCollection;
+import com.mongodb.reactivestreams.client.MongoDatabase;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SourceContext;
+import org.bson.BsonDocument;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.mockito.Mock;
+import org.reactivestreams.Subscriber;
+import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+
+public class MongoSourceTest {
+
+    @Mock
+    private SourceContext mockSourceContext;
+
+    @Mock
+    private MongoClient mockMongoClient;
+
+    @Mock
+    private MongoDatabase mockMongoDb;
+
+    @Mock
+    private MongoCollection mockMongoColl;
+
+    @Mock
+    private ChangeStreamPublisher mockPublisher;
+
+    private Subscriber subscriber;
+
+    private MongoSource source;
+
+    private Map<String, Object> map;
+
+
+    @ObjectFactory
+    public IObjectFactory getObjectFactory() {
+        return new org.powermock.modules.testng.PowerMockObjectFactory();
+    }
+
+    @BeforeMethod
+    public void setUp() {
+
+        map = TestHelper.createMap(true);
+
+        mockSourceContext = mock(SourceContext.class);
+        mockMongoClient = mock(MongoClient.class);
+        mockMongoDb = mock(MongoDatabase.class);
+        mockMongoColl = mock(MongoCollection.class);
+        mockPublisher = mock(ChangeStreamPublisher.class);
+
+        source = new MongoSource(() -> mockMongoClient);
+
+        when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
+        when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
+        when(mockMongoColl.watch()).thenReturn(mockPublisher);
+        when(mockPublisher.batchSize(anyInt())).thenReturn(mockPublisher);
+        when(mockPublisher.fullDocument(any())).thenReturn(mockPublisher);
+
+        doAnswer((invocation) -> {
+            subscriber = invocation.getArgument(0, Subscriber.class);
+            return null;
+        }).when(mockPublisher).subscribe(any());
+    }
+
+    @AfterMethod
+    public void tearDown() throws Exception {
+        source.close();
+        verify(mockMongoClient, times(1)).close();
+    }
+
+    @Test
+    public void testOpen() throws Exception {
+        source.open(map, mockSourceContext);
+    }
+
+    @Test
+    public void testWriteBadMessage() throws Exception {
+
+        source.open(map, mockSourceContext);
+
+        subscriber.onNext(new ChangeStreamDocument<Document>(null, new MongoNamespace("hello.pulsar"),
+                new Document("hello", "pulsar"), new BsonDocument("_id", new BsonString("id")), OperationType.INSERT, null));
+
+        Record<byte[]> record = source.read();
+
+        assertEquals(new String(record.getValue()),
+                "{\"fullDocument\":{\"hello\":\"pulsar\"},"
+                + "\"ns\":{\"databaseName\":\"hello\",\"collectionName\":\"pulsar\",\"fullName\":\"hello.pulsar\"},"
+                + "\"operation\":\"INSERT\"}");
+    }
+}