blob: 7f9f0ed7e460e59006c67293f485c439c1ee972e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.mongodb;
import static org.junit.Assert.assertEquals;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.IMongodConfig;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** Test on the MongoDbIO. */
@RunWith(JUnit4.class)
public class MongoDbIOTest {
private static final Logger LOG = LoggerFactory.getLogger(MongoDbIOTest.class);
@ClassRule public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder();
private static final String DATABASE = "beam";
private static final String COLLECTION = "test";
private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();
private static MongodExecutable mongodExecutable;
private static MongodProcess mongodProcess;
private static MongoClient client;
private static int port;
@Rule public final TestPipeline pipeline = TestPipeline.create();
@BeforeClass
public static void beforeClass() throws Exception {
port = NetworkTestHelper.getAvailableLocalPort();
LOG.info("Starting MongoDB embedded instance on {}", port);
IMongodConfig mongodConfig =
new MongodConfigBuilder()
.version(Version.Main.PRODUCTION)
.configServer(false)
.replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), null, 0))
.net(new Net("localhost", port, Network.localhostIsIPv6()))
.cmdOptions(
new MongoCmdOptionsBuilder()
.syncDelay(10)
.useNoPrealloc(true)
.useSmallFiles(true)
.useNoJournal(true)
.verbose(false)
.build())
.build();
mongodExecutable = mongodStarter.prepare(mongodConfig);
mongodProcess = mongodExecutable.start();
client = new MongoClient("localhost", port);
LOG.info("Insert test data");
List<Document> documents = createDocuments(1000);
MongoCollection<Document> collection = getCollection(COLLECTION);
collection.insertMany(documents);
}
@AfterClass
public static void afterClass() {
LOG.info("Stopping MongoDB instance");
client.close();
mongodProcess.stop();
mongodExecutable.stop();
}
@Test
public void testSplitIntoFilters() {
// A single split will result in two filters
ArrayList<Document> documents = new ArrayList<>();
documents.add(new Document("_id", 56));
List<String> filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents);
assertEquals(2, filters.size());
assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0));
assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"56\")}} ]}", filters.get(1));
// Add two more splits; now we should have 4 filters
documents.add(new Document("_id", 109));
documents.add(new Document("_id", 256));
filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents);
assertEquals(4, filters.size());
assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0));
assertEquals(
"{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}", filters.get(1));
assertEquals(
"{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}", filters.get(2));
assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", filters.get(3));
}
@Test
public void testSplitIntoBucket() {
// a single split should result in two buckets
ArrayList<Document> documents = new ArrayList<>();
documents.add(new Document("_id", new ObjectId("52cc8f6254c5317943000005")));
List<BsonDocument> buckets = MongoDbIO.BoundedMongoDbSource.splitKeysToMatch(documents);
assertEquals(2, buckets.size());
assertEquals(
"{ \"$match\" : { \"_id\" : { \"$lte\" : { \"$oid\" : \"52cc8f6254c5317943000005\" } } } }",
buckets.get(0).toString());
assertEquals(
"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"52cc8f6254c5317943000005\" } } } }",
buckets.get(1).toString());
// add more splits and verify the buckets
documents.add(new Document("_id", new ObjectId("52cc8f6254c5317943000007")));
documents.add(new Document("_id", new ObjectId("54242e9e54c531ef8800001f")));
buckets = MongoDbIO.BoundedMongoDbSource.splitKeysToMatch(documents);
assertEquals(4, buckets.size());
assertEquals(
"{ \"$match\" : { \"_id\" : { \"$lte\" : { \"$oid\" : \"52cc8f6254c5317943000005\" } } } }",
buckets.get(0).toString());
assertEquals(
"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"52cc8f6254c5317943000005\" }, \"$lte\" : { \"$oid\" : \"52cc8f6254c5317943000007\" } } } }",
buckets.get(1).toString());
assertEquals(
"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"52cc8f6254c5317943000007\" }, \"$lte\" : { \"$oid\" : \"54242e9e54c531ef8800001f\" } } } }",
buckets.get(2).toString());
assertEquals(
"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"54242e9e54c531ef8800001f\" } } } }",
buckets.get(3).toString());
}
@Test
public void testBuildAutoBuckets() {
List<BsonDocument> aggregates = new ArrayList<BsonDocument>();
aggregates.add(
new BsonDocument(
"$match",
new BsonDocument("country", new BsonDocument("$eq", new BsonString("England")))));
MongoDbIO.Read spec =
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(COLLECTION)
.withQueryFn(AggregationQuery.create().withMongoDbPipeline(aggregates));
MongoDatabase database = client.getDatabase(DATABASE);
List<Document> buckets = MongoDbIO.BoundedMongoDbSource.buildAutoBuckets(database, spec);
assertEquals(10, buckets.size());
}
@Test
public void testFullRead() {
PCollection<Document> output =
pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(COLLECTION));
PAssert.thatSingleton(output.apply("Count All", Count.globally())).isEqualTo(1000L);
PAssert.that(
output
.apply("Map Scientist", MapElements.via(new DocumentToKVFn()))
.apply("Count Scientist", Count.perKey()))
.satisfies(
input -> {
for (KV<String, Long> element : input) {
assertEquals(100L, element.getValue().longValue());
}
return null;
});
pipeline.run();
}
@Test
public void testReadWithCustomConnectionOptions() {
MongoDbIO.Read read =
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withMaxConnectionIdleTime(10)
.withDatabase(DATABASE)
.withCollection(COLLECTION);
assertEquals(10, read.maxConnectionIdleTime());
PCollection<Document> documents = pipeline.apply(read);
PAssert.thatSingleton(documents.apply("Count All", Count.globally())).isEqualTo(1000L);
PAssert.that(
documents
.apply("Map Scientist", MapElements.via(new DocumentToKVFn()))
.apply("Count Scientist", Count.perKey()))
.satisfies(
input -> {
for (KV<String, Long> element : input) {
assertEquals(100L, element.getValue().longValue());
}
return null;
});
pipeline.run();
}
@Test
public void testReadWithFilter() {
PCollection<Document> output =
pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(COLLECTION)
.withQueryFn(FindQuery.create().withFilters(Filters.eq("scientist", "Einstein"))));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(100L);
pipeline.run();
}
@Test
public void testReadWithFilterAndLimit() throws Exception {
PCollection<Document> output =
pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(COLLECTION)
.withNumSplits(10)
.withQueryFn(
FindQuery.create()
.withFilters(Filters.eq("scientist", "Einstein"))
.withLimit(5)));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(5L);
pipeline.run();
}
@Test
public void testReadWithAggregate() throws Exception {
List<BsonDocument> aggregates = new ArrayList<BsonDocument>();
aggregates.add(
new BsonDocument(
"$match",
new BsonDocument("country", new BsonDocument("$eq", new BsonString("England")))));
PCollection<Document> output =
pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(COLLECTION)
.withQueryFn(AggregationQuery.create().withMongoDbPipeline(aggregates)));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(300L);
pipeline.run();
}
@Test
public void testReadWithAggregateWithLimit() throws Exception {
List<BsonDocument> aggregates = new ArrayList<BsonDocument>();
aggregates.add(
new BsonDocument(
"$match",
new BsonDocument("country", new BsonDocument("$eq", new BsonString("England")))));
aggregates.add(new BsonDocument("$limit", new BsonInt32(10)));
PCollection<Document> output =
pipeline.apply(
MongoDbIO.read()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(COLLECTION)
.withQueryFn(AggregationQuery.create().withMongoDbPipeline(aggregates)));
PAssert.thatSingleton(output.apply("Count", Count.globally())).isEqualTo(10L);
pipeline.run();
}
@Test
public void testWrite() {
final String collectionName = "testWrite";
final int numElements = 1000;
pipeline
.apply(Create.of(createDocuments(numElements)))
.apply(
MongoDbIO.write()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withCollection(collectionName));
pipeline.run();
assertEquals(numElements, countElements(collectionName));
}
@Test
public void testWriteUnordered() {
final String collectionName = "testWriteUnordered";
Document doc =
Document.parse("{\"_id\":\"521df3a4300466f1f2b5ae82\",\"scientist\":\"Test %s\"}");
pipeline
.apply(Create.of(doc, doc))
.apply(
MongoDbIO.write()
.withUri("mongodb://localhost:" + port)
.withDatabase(DATABASE)
.withOrdered(false)
.withCollection(collectionName));
pipeline.run();
assertEquals(1, countElements(collectionName));
}
private static List<Document> createDocuments(final int n) {
final String[] scientists =
new String[] {
"Einstein",
"Darwin",
"Copernicus",
"Pasteur",
"Curie",
"Faraday",
"Newton",
"Bohr",
"Galilei",
"Maxwell"
};
final String[] country =
new String[] {
"Germany",
"England",
"Poland",
"France",
"France",
"England",
"England",
"Denmark",
"Florence",
"Scotland"
};
List<Document> documents = new ArrayList<>();
for (int i = 1; i <= n; i++) {
int index = i % scientists.length;
Document document = new Document();
document.append("scientist", scientists[index]);
document.append("country", country[index]);
documents.add(document);
}
return documents;
}
private static int countElements(final String collectionName) {
return Iterators.size(getCollection(collectionName).find().iterator());
}
private static MongoCollection<Document> getCollection(final String collectionName) {
MongoDatabase database = client.getDatabase(DATABASE);
return database.getCollection(collectionName);
}
static class DocumentToKVFn extends SimpleFunction<Document, KV<String, Void>> {
@Override
public KV<String, Void> apply(Document input) {
return KV.of(input.getString("scientist"), null);
}
}
}