blob: 1666a4e663001b3a93cb0018849059b561b46508 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.mongodb.client.MongoCursor;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class PutMongoIT extends MongoWriteTestBase {
@BeforeEach
public void setup() {
super.setup(PutMongo.class);
}
@Override
@AfterEach
public void teardown() {
super.teardown();
}
private byte[] documentToByteArray(Document doc) {
return doc.toJson().getBytes(StandardCharsets.UTF_8);
}
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(PutMongo.class);
Collection<ValidationResult> results;
ProcessContext pc;
// missing uri, db, collection
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
assertEquals(2, results.size());
Iterator<ValidationResult> it = results.iterator();
Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
// invalid write concern
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DATABASE_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.setProperty(PutMongo.WRITE_CONCERN, "xyz");
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
assertEquals(1, results.size());
Assertions.assertTrue(results.iterator().next().toString().matches("'Write Concern' .* is invalid because Given value not found in allowed set .*"));
// valid write concern
runner.setProperty(PutMongo.WRITE_CONCERN, PutMongo.WRITE_CONCERN_UNACKNOWLEDGED);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
assertEquals(0, results.size());
}
@Test
public void testQueryAndUpdateKey() {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
runner.setProperty(PutMongo.UPDATE_QUERY, "{}");
runner.assertNotValid();
}
@Test
public void testNoQueryAndNoUpdateKey() {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.removeProperty(PutMongo.UPDATE_QUERY);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "");
runner.assertNotValid();
}
@Test
public void testBlankUpdateKey() {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, " ");
runner.assertNotValid();
}
@Test
public void testUpdateQuery() {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"$set\": {\n" +
"\t\t\"email\": \"john.smith@test.com\",\n" +
"\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
"\t},\n" +
"\t\"$inc\": {\n" +
"\t\t\"writes\": 1\n" +
"\t}\n" +
"}";
Map<String, String> attr = new HashMap<>();
attr.put("mongo.update.query", document.toJson());
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setProperty(PutMongo.UPDATE_QUERY, "${mongo.update.query}");
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody, attr);
updateTests(runner, document);
}
@Test
public void testUpdateBySimpleKey() {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"name\": \"John Smith\",\n" +
"\t\"$set\": {\n" +
"\t\t\"email\": \"john.smith@test.com\",\n" +
"\t\t\"grade\": \"Sr. Principle Eng.\"\n" +
"\t},\n" +
"\t\"$inc\": {\n" +
"\t\t\"writes\": 1\n" +
"\t}\n" +
"}";
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name");
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody);
updateTests(runner, document);
}
@Test
public void testUpdateWithFullDocByKeys() {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "name,department");
testUpdateFullDocument(runner);
}
@Test
public void testUpdateWithFullDocByQuery() {
TestRunner runner = init(PutMongo.class);
String query = "{ \"name\": \"John Smith\"}";
runner.setProperty(PutMongo.UPDATE_QUERY, query);
testUpdateFullDocument(runner);
}
private void testUpdateFullDocument(TestRunner runner) {
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering");
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"name\": \"John Smith\",\n" +
"\t\"department\": \"Engineering\",\n" +
"\t\"contacts\": {\n" +
"\t\t\"phone\": \"555-555-5555\",\n" +
"\t\t\"email\": \"john.smith@test.com\",\n" +
"\t\t\"twitter\": \"@JohnSmith\"\n" +
"\t}\n" +
"}";
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_DOC);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody);
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
MongoCursor<Document> cursor = collection.find(document).iterator();
Document found = cursor.next();
assertEquals(found.get("name"), document.get("name"));
assertEquals(found.get("department"), document.get("department"));
Document contacts = (Document)found.get("contacts");
Assertions.assertNotNull(contacts);
assertEquals(contacts.get("twitter"), "@JohnSmith");
assertEquals(contacts.get("email"), "john.smith@test.com");
assertEquals(contacts.get("phone"), "555-555-5555");
assertEquals(collection.count(document), 1);
}
@Test
public void testUpdateByComplexKey() {
TestRunner runner = init(PutMongo.class);
Document document = new Document()
.append("name", "John Smith")
.append("department", "Engineering")
.append("contacts", new Document().append("email", "john.smith@test.com")
.append("phone", "555-555-5555"));
collection.insertOne(document);
String updateBody = "{\n" +
"\t\"contacts.phone\": \"555-555-5555\",\n" +
"\t\"contacts.email\": \"john.smith@test.com\",\n" +
"\t\"$set\": {\n" +
"\t\t\"contacts.twitter\": \"@JohnSmith\"\n" +
"\t},\n" +
"\t\"$inc\": {\n" +
"\t\t\"writes\": 1\n" +
"\t}\n" +
"}";
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "contacts.phone,contacts.email");
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, PutMongo.MODE_UPDATE);
runner.setValidateExpressionUsage(true);
runner.enqueue(updateBody);
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
MongoCursor<Document> iterator = collection.find(new Document("name", "John Smith")).iterator();
Assertions.assertTrue(iterator.hasNext(), "Document did not come back.");
Document val = iterator.next();
Map contacts = (Map)val.get("contacts");
Assertions.assertNotNull(contacts);
Assertions.assertTrue(contacts.containsKey("twitter") && contacts.get("twitter").equals("@JohnSmith"));
Assertions.assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
}
private void updateTests(TestRunner runner, Document document) {
runner.run();
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
MongoCursor<Document> iterator = collection.find(document).iterator();
Assertions.assertTrue(iterator.hasNext(), "Document did not come back.");
Document val = iterator.next();
Assertions.assertTrue(val.containsKey("email") && val.get("email").equals("john.smith@test.com"));
Assertions.assertTrue(val.containsKey("grade") && val.get("grade").equals("Sr. Principle Eng."));
Assertions.assertTrue(val.containsKey("writes") && val.get("writes").equals(1));
}
@Test
public void testInsertOne() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testInsertMany() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
for (Document doc : DOCUMENTS) {
runner.enqueue(documentToByteArray(doc));
}
runner.run(3);
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
}
// verify 3 docs inserted into the collection
assertEquals(3, collection.count());
}
@Test
public void testInsertWithDuplicateKey() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
// pre-insert one document
collection.insertOne(DOCUMENTS.get(0));
for (Document doc : DOCUMENTS) {
runner.enqueue(documentToByteArray(doc));
}
runner.run(3);
// first doc failed, other 2 succeeded
runner.assertTransferCount(PutMongo.REL_FAILURE, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_FAILURE).get(0);
out.assertContentEquals(documentToByteArray(DOCUMENTS.get(0)));
runner.assertTransferCount(PutMongo.REL_SUCCESS, 2);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i+1).toJson());
}
// verify 2 docs inserted into the collection for a total of 3
assertEquals(3, collection.count());
}
/**
* Verifies that 'update' does not insert if 'upsert' if false.
* @see #testUpsert()
*/
@Test
public void testUpdateDoesNotInsert() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// nothing was in collection, so nothing to update since upsert defaults to false
assertEquals(0, collection.count());
}
/**
* Verifies that 'update' does insert if 'upsert' is true.
* @see #testUpdateDoesNotInsert()
*/
@Test
public void testUpsert() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0);
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testUpsertWithOid() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
byte[] bytes = documentToByteArray(oidDocument);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
// verify 1 doc inserted into the collection
assertEquals(1, collection.count());
assertEquals(oidDocument, collection.find().first());
}
@Test
public void testUpdate() throws Exception {
TestRunner runner = init(PutMongo.class);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
Document doc = DOCUMENTS.get(0);
// pre-insert document
collection.insertOne(doc);
// modify the object
doc.put("abc", "123");
doc.put("xyz", "456");
doc.remove("c");
byte[] bytes = documentToByteArray(doc);
runner.setProperty(PutMongo.MODE, "update");
runner.enqueue(bytes);
runner.run();
runner.assertAllFlowFilesTransferred(PutMongo.REL_SUCCESS, 1);
MockFlowFile out = runner.getFlowFilesForRelationship(PutMongo.REL_SUCCESS).get(0);
out.assertContentEquals(bytes);
assertEquals(1, collection.count());
assertEquals(doc, collection.find().first());
}
@Test
public void testUpsertWithOperators() throws Exception {
TestRunner runner = init(PutMongo.class);
String upsert = "{\n" +
" \"_id\": \"Test\",\n" +
" \"$push\": {\n" +
" \"testArr\": { \"msg\": \"Hi\" }\n" +
" }\n" +
"}";
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, "_id");
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
for (int x = 0; x < 3; x++) {
runner.enqueue(upsert.getBytes());
}
runner.run(3, true, true);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, 3);
Document query = new Document("_id", "Test");
Document result = collection.find(query).first();
List array = (List)result.get("testArr");
Assertions.assertNotNull(array, "Array was empty");
assertEquals(3, array.size(), "Wrong size");
for (int index = 0; index < array.size(); index++) {
Document doc = (Document)array.get(index);
String msg = doc.getString("msg");
Assertions.assertNotNull("Msg was null", msg);
assertEquals(msg, "Hi", "Msg had wrong value");
}
}
/*
* Start NIFI-4759 Regression Tests
*
* 2 issues with ID field:
*
* * Assumed _id is the update key, causing failures when the user configured a different one in the UI.
* * Treated _id as a string even when it is an ObjectID sent from another processor as a string value.
*
* Expected behavior:
*
* * update key field should work no matter what (legal) value it is set to be.
* * _ids that are ObjectID should become real ObjectIDs when added to Mongo.
* * _ids that are arbitrary strings should be still go in as strings.
*
*/
@Test
public void testNiFi_4759_Regressions() {
TestRunner runner = init(PutMongo.class);
String[] upserts = new String[]{
"{ \"_id\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }",
"{ \"_id\": \"5a5617b9c1f5de6d8276e87d\", \"$set\": { \"msg\": \"Hello, world\" } }",
"{ \"updateKey\": \"12345\", \"$set\": { \"msg\": \"Hello, world\" } }"
};
String[] updateKeyProps = new String[] { "_id", "_id", "updateKey" };
Object[] updateKeys = new Object[] { "12345", new ObjectId("5a5617b9c1f5de6d8276e87d"), "12345" };
int index = 0;
runner.setProperty(PutMongo.UPDATE_MODE, PutMongo.UPDATE_WITH_OPERATORS);
runner.setProperty(PutMongo.MODE, "update");
runner.setProperty(PutMongo.UPSERT, "true");
final int LIMIT = 2;
for (String upsert : upserts) {
runner.setProperty(PutMongo.UPDATE_QUERY_KEY, updateKeyProps[index]);
for (int x = 0; x < LIMIT; x++) {
runner.enqueue(upsert);
}
runner.run(LIMIT, true, true);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
runner.assertTransferCount(PutMongo.REL_SUCCESS, LIMIT);
Document query = new Document(updateKeyProps[index], updateKeys[index]);
Document result = collection.find(query).first();
Assertions.assertNotNull(result, "Result was null");
assertEquals(1, collection.count(query), "Count was wrong");
runner.clearTransferState();
index++;
}
}
@Test
public void testClientService() throws Exception {
MongoDBClientService clientService = new MongoDBControllerService();
TestRunner runner = init(PutMongo.class);
runner.addControllerService("clientService", clientService);
runner.removeProperty(PutMongo.URI);
runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
runner.setProperty(PutMongo.CLIENT_SERVICE, "clientService");
runner.enableControllerService(clientService);
runner.assertValid();
runner.enqueue("{ \"msg\": \"Hello, world\" }");
runner.run();
runner.assertTransferCount(PutMongo.REL_SUCCESS, 1);
runner.assertTransferCount(PutMongo.REL_FAILURE, 0);
}
}