Add a source connector for MongoDB (#5316)
* Add a source connector for MongoDB
* Fix expected license header
diff --git a/pulsar-io/mongo/pom.xml b/pulsar-io/mongo/pom.xml
index 1487b57..de09599 100644
--- a/pulsar-io/mongo/pom.xml
+++ b/pulsar-io/mongo/pom.xml
@@ -33,7 +33,7 @@
<name>Pulsar IO :: MongoDB</name>
<properties>
- <mongo-driver.version>3.8.2</mongo-driver.version>
+ <mongo-reactivestreams.version>1.12.0</mongo-reactivestreams.version>
</properties>
<dependencies>
@@ -44,8 +44,8 @@
</dependency>
<dependency>
<groupId>org.mongodb</groupId>
- <artifactId>mongodb-driver-async</artifactId>
- <version>${mongo-driver.version}</version>
+ <artifactId>mongodb-driver-reactivestreams</artifactId>
+ <version>${mongo-reactivestreams.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
index 602c105..7585de1 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoConfig.java
@@ -32,7 +32,7 @@
import java.util.Map;
/**
- * Configuration class for the MongoDB Sink Connector.
+ * Configuration class for the MongoDB Connectors.
*/
@Data
@Setter
@@ -57,30 +57,28 @@
private String mongoUri;
@FieldDoc(
- required = true,
defaultValue = "",
- help = "The name of the database to which the collection belongs to"
+ help = "The database name to which the collection belongs and which must be watched for the source connector"
+ + " (required for the sink connector)"
)
private String database;
@FieldDoc(
- required = true,
defaultValue = "",
- help = "The collection name that the connector writes messages to"
+ help = "The collection name where the messages are written or which is watched for the source connector"
+ + " (required for the sink connector)"
)
private String collection;
@FieldDoc(
- required = false,
defaultValue = "" + DEFAULT_BATCH_SIZE,
- help = "The batch size of write to the collection"
+ help = "The batch size of write to or read from the database"
)
private int batchSize = DEFAULT_BATCH_SIZE;
@FieldDoc(
- required = false,
- defaultValue = "" + DEFAULT_BATCH_TIME_MS,
- help = "The batch operation interval in milliseconds")
+ defaultValue = "" + DEFAULT_BATCH_TIME_MS,
+ help = "The batch operation interval in milliseconds")
private long batchTimeMs = DEFAULT_BATCH_TIME_MS;
@@ -98,12 +96,15 @@
return cfg;
}
- public void validate() {
- if (StringUtils.isEmpty(mongoUri) || StringUtils.isEmpty(database) || StringUtils.isEmpty(collection)) {
+ public void validate(boolean dbRequired, boolean collectionRequired) {
+ if (StringUtils.isEmpty(getMongoUri()) ||
+ (dbRequired && StringUtils.isEmpty(getDatabase())) ||
+ (collectionRequired && StringUtils.isEmpty(getCollection()))) {
+
throw new IllegalArgumentException("Required property not set.");
}
- Preconditions.checkArgument(batchSize > 0, "batchSize must be a positive integer.");
- Preconditions.checkArgument(batchTimeMs > 0, "batchTimeMs must be a positive long.");
+ Preconditions.checkArgument(getBatchSize() > 0, "batchSize must be a positive integer.");
+ Preconditions.checkArgument(getBatchTimeMs() > 0, "batchTimeMs must be a positive long.");
}
}
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
index 7dc665d..2bfcc33 100644
--- a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSink.java
@@ -18,14 +18,21 @@
*/
package org.apache.pulsar.io.mongodb;
-import static java.util.stream.Collectors.toList;
-
import com.google.common.collect.Lists;
import com.mongodb.MongoBulkWriteException;
import com.mongodb.async.client.MongoClient;
import com.mongodb.async.client.MongoClients;
import com.mongodb.async.client.MongoCollection;
import com.mongodb.async.client.MongoDatabase;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.Sink;
+import org.apache.pulsar.io.core.SinkContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.BSONException;
+import org.bson.Document;
+import org.bson.json.JsonParseException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -38,16 +45,7 @@
import java.util.function.Supplier;
import java.util.stream.IntStream;
-import lombok.extern.slf4j.Slf4j;
-
-import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
-import org.bson.BSONException;
-import org.bson.Document;
-import org.bson.json.JsonParseException;
+import static java.util.stream.Collectors.toList;
/**
* The base class for MongoDB sinks.
@@ -88,7 +86,7 @@
log.info("Open MongoDB Sink");
mongoConfig = MongoConfig.load(config);
- mongoConfig.validate();
+ mongoConfig.validate(true, true);
if (clientProvider != null) {
mongoClient = clientProvider.get();
diff --git a/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
new file mode 100644
index 0000000..2ce1475
--- /dev/null
+++ b/pulsar-io/mongo/src/main/java/org/apache/pulsar/io/mongodb/MongoSource.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.io.mongodb;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.FullDocument;
+import com.mongodb.reactivestreams.client.*;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.PushSource;
+import org.apache.pulsar.io.core.SourceContext;
+import org.apache.pulsar.io.core.annotations.Connector;
+import org.apache.pulsar.io.core.annotations.IOType;
+import org.bson.Document;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+
+import java.io.UnsupportedEncodingException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+/**
+ * The base class for MongoDB sources.
+ */
+@Connector(
+ name = "mongo",
+ type = IOType.SOURCE,
+ help = "A source connector that sends mongodb documents to pulsar",
+ configClass = MongoConfig.class
+)
+@Slf4j
+public class MongoSource extends PushSource<byte[]> {
+
+ private final Supplier<MongoClient> clientProvider;
+
+ private MongoConfig mongoConfig;
+
+ private MongoClient mongoClient;
+
+ private Thread streamThread;
+
+ private ChangeStreamPublisher<Document> stream;
+
+
+ public MongoSource() {
+ this(null);
+ }
+
+ public MongoSource(Supplier<MongoClient> clientProvider) {
+ this.clientProvider = clientProvider;
+ }
+
+ @Override
+ public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
+ log.info("Open MongoDB Source");
+
+ mongoConfig = MongoConfig.load(config);
+ mongoConfig.validate(false, false);
+
+ if (clientProvider != null) {
+ mongoClient = clientProvider.get();
+ } else {
+ mongoClient = MongoClients.create(mongoConfig.getMongoUri());
+ }
+
+ if (StringUtils.isEmpty(mongoConfig.getDatabase())) {
+ // Watch all databases
+ log.info("Watch all");
+ stream = mongoClient.watch();
+
+ } else {
+ final MongoDatabase db = mongoClient.getDatabase(mongoConfig.getDatabase());
+
+ if (StringUtils.isEmpty(mongoConfig.getCollection())) {
+ // Watch all collections in a database
+ log.info("Watch db: {}", db.getName());
+ stream = db.watch();
+
+ } else {
+ // Watch a collection
+
+ final MongoCollection<Document> collection = db.getCollection(mongoConfig.getCollection());
+ log.info("Watch collection: {} {}", db.getName(), mongoConfig.getCollection());
+ stream = collection.watch();
+ }
+ }
+
+ stream.batchSize(mongoConfig.getBatchSize()).fullDocument(FullDocument.UPDATE_LOOKUP);
+
+ stream.subscribe(new Subscriber<ChangeStreamDocument<Document>>() {
+ private ObjectMapper mapper = new ObjectMapper();
+ private Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription subscription) {
+ this.subscription = subscription;
+ this.subscription.request(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void onNext(ChangeStreamDocument<Document> doc) {
+ try {
+ log.info("New change doc: {}", doc);
+
+ // Build a record with the essential information
+ final Map<String, Object> recordValue = new HashMap<>();
+ recordValue.put("fullDocument", doc.getFullDocument());
+ recordValue.put("ns", doc.getNamespace());
+ recordValue.put("operation", doc.getOperationType());
+
+ consume(new DocRecord(
+ Optional.of(doc.getDocumentKey().toJson()),
+ mapper.writeValueAsString(recordValue).getBytes("UTF-8")));
+
+ } catch (UnsupportedEncodingException | JsonProcessingException e) {
+ log.error("Processing doc from mongo", e);
+ }
+ }
+
+ @Override
+ public void onError(Throwable error) {
+ log.error("Subscriber error", error);
+ }
+
+ @Override
+ public void onComplete() {
+ log.info("Subscriber complete");
+ }
+ });
+
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (mongoClient != null) {
+ mongoClient.close();
+ }
+ }
+
+ @Data
+ private static class DocRecord implements Record<byte[]> {
+ private final Optional<String> key;
+ private final byte[] value;
+ }
+}
diff --git a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
index a1cf293..bc32292 100644
--- a/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/mongo/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -17,5 +17,6 @@
# under the License.
#
name: mongo
-description: Writes data into MongoDB
+description: MongoDB source and sink connector
sinkClass: org.apache.pulsar.io.mongodb.MongoSink
+sourceClass: org.apache.pulsar.io.mongodb.MongoSource
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
index 9f98ee4..8495d87 100644
--- a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoConfigTest.java
@@ -50,7 +50,7 @@
final Map<String, Object> map = TestHelper.createMap(false);
final MongoConfig cfg = MongoConfig.load(map);
- cfg.validate();
+ cfg.validate(true, true);
}
@Test(expectedExceptions = IllegalArgumentException.class,
@@ -60,7 +60,7 @@
map.put("batchSize", 0);
final MongoConfig cfg = MongoConfig.load(map);
- cfg.validate();
+ cfg.validate(true, true);
}
@Test(expectedExceptions = IllegalArgumentException.class,
@@ -70,7 +70,7 @@
map.put("batchTimeMs", 0);
final MongoConfig cfg = MongoConfig.load(map);
- cfg.validate();
+ cfg.validate(true, true);
}
@Test
diff --git a/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
new file mode 100644
index 0000000..63e07e2
--- /dev/null
+++ b/pulsar-io/mongo/src/test/java/org/apache/pulsar/io/mongodb/MongoSourceTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.io.mongodb;
+
+import com.mongodb.MongoNamespace;
+import com.mongodb.client.model.changestream.ChangeStreamDocument;
+import com.mongodb.client.model.changestream.OperationType;
+import com.mongodb.reactivestreams.client.ChangeStreamPublisher;
+import com.mongodb.reactivestreams.client.MongoClient;
+import com.mongodb.reactivestreams.client.MongoCollection;
+import com.mongodb.reactivestreams.client.MongoDatabase;
+import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.io.core.SourceContext;
+import org.bson.BsonDocument;
+import org.bson.BsonString;
+import org.bson.Document;
+import org.mockito.Mock;
+import org.reactivestreams.Subscriber;
+import org.testng.IObjectFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.ObjectFactory;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.*;
+import static org.testng.Assert.assertEquals;
+
+public class MongoSourceTest {
+
+ @Mock
+ private SourceContext mockSourceContext;
+
+ @Mock
+ private MongoClient mockMongoClient;
+
+ @Mock
+ private MongoDatabase mockMongoDb;
+
+ @Mock
+ private MongoCollection mockMongoColl;
+
+ @Mock
+ private ChangeStreamPublisher mockPublisher;
+
+ private Subscriber subscriber;
+
+ private MongoSource source;
+
+ private Map<String, Object> map;
+
+
+ @ObjectFactory
+ public IObjectFactory getObjectFactory() {
+ return new org.powermock.modules.testng.PowerMockObjectFactory();
+ }
+
+ @BeforeMethod
+ public void setUp() {
+
+ map = TestHelper.createMap(true);
+
+ mockSourceContext = mock(SourceContext.class);
+ mockMongoClient = mock(MongoClient.class);
+ mockMongoDb = mock(MongoDatabase.class);
+ mockMongoColl = mock(MongoCollection.class);
+ mockPublisher = mock(ChangeStreamPublisher.class);
+
+ source = new MongoSource(() -> mockMongoClient);
+
+ when(mockMongoClient.getDatabase(anyString())).thenReturn(mockMongoDb);
+ when(mockMongoDb.getCollection(anyString())).thenReturn(mockMongoColl);
+ when(mockMongoColl.watch()).thenReturn(mockPublisher);
+ when(mockPublisher.batchSize(anyInt())).thenReturn(mockPublisher);
+ when(mockPublisher.fullDocument(any())).thenReturn(mockPublisher);
+
+ doAnswer((invocation) -> {
+ subscriber = invocation.getArgument(0, Subscriber.class);
+ return null;
+ }).when(mockPublisher).subscribe(any());
+ }
+
+ @AfterMethod
+ public void tearDown() throws Exception {
+ source.close();
+ verify(mockMongoClient, times(1)).close();
+ }
+
+ @Test
+ public void testOpen() throws Exception {
+ source.open(map, mockSourceContext);
+ }
+
+ @Test
+ public void testWriteBadMessage() throws Exception {
+
+ source.open(map, mockSourceContext);
+
+ subscriber.onNext(new ChangeStreamDocument<Document>(null, new MongoNamespace("hello.pulsar"),
+ new Document("hello", "pulsar"), new BsonDocument("_id", new BsonString("id")), OperationType.INSERT, null));
+
+ Record<byte[]> record = source.read();
+
+ assertEquals(new String(record.getValue()),
+ "{\"fullDocument\":{\"hello\":\"pulsar\"},"
+ + "\"ns\":{\"databaseName\":\"hello\",\"collectionName\":\"pulsar\",\"fullName\":\"hello.pulsar\"},"
+ + "\"operation\":\"INSERT\"}");
+ }
+}