blob: a304f5a22e48bb6a253acbf87eb6e5bd23539386 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Pattern;
public class GetMongoIT {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DB_NAME = GetMongoIT.class.getSimpleName().toLowerCase();
private static final String COLLECTION_NAME = "test";
private static final List<Document> DOCUMENTS;
private static final Calendar CAL;
static {
CAL = Calendar.getInstance();
DOCUMENTS = Lists.newArrayList(
new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3),
new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4).append("date_field", CAL.getTime()),
new Document("_id", "doc_3").append("a", 1).append("b", 3)
);
}
private TestRunner runner;
private MongoClient mongoClient;
@BeforeEach
public void setup() {
runner = TestRunners.newTestRunner(GetMongo.class);
runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DB_NAME);
runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP);
runner.setIncomingConnection(false);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
collection.insertMany(DOCUMENTS);
}
@AfterEach
public void teardown() {
runner = null;
mongoClient.getDatabase(DB_NAME).drop();
}
@Test
public void testValidators() {
TestRunner runner = TestRunners.newTestRunner(GetMongo.class);
Collection<ValidationResult> results;
ProcessContext pc;
// missing uri, db, collection
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assertions.assertEquals(2, results.size());
Iterator<ValidationResult> it = results.iterator();
Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required"));
Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required"));
// missing query - is ok
runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assertions.assertEquals(0, results.size());
// invalid query
runner.setProperty(GetMongo.QUERY, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assertions.assertEquals(1, results.size());
Assertions.assertTrue(results.iterator().next().toString().contains("is invalid because"));
// invalid projection
runner.setVariable("projection", "{a: x,y,z}");
runner.setProperty(GetMongo.QUERY, "{\"a\": 1}");
runner.setProperty(GetMongo.PROJECTION, "{a: z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assertions.assertEquals(1, results.size());
Assertions.assertTrue(results.iterator().next().toString().contains("is invalid"));
// invalid sort
runner.removeProperty(GetMongo.PROJECTION);
runner.setProperty(GetMongo.SORT, "{a: x,y,z}");
runner.enqueue(new byte[0]);
pc = runner.getProcessContext();
results = new HashSet<>();
if (pc instanceof MockProcessContext) {
results = ((MockProcessContext) pc).validate();
}
Assertions.assertEquals(1, results.size());
Assertions.assertTrue(results.iterator().next().toString().contains("is invalid"));
}
@Test
public void testCleanJson() throws Exception {
runner.setVariable("query", "{\"_id\": \"doc_2\"}");
runner.setProperty(GetMongo.QUERY, "${query}");
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
ObjectMapper mapper = new ObjectMapper();
Map<String, Object> parsed = mapper.readValue(raw, Map.class);
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
Assertions.assertTrue(parsed.get("date_field").getClass() == String.class);
Assertions.assertTrue(((String)parsed.get("date_field")).startsWith(format.format(CAL.getTime())));
}
@Test
public void testReadOneDocument() throws Exception {
runner.setVariable("query", "{a: 1, b: 3}");
runner.setProperty(GetMongo.QUERY, "${query}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
}
@Test
public void testReadMultipleDocuments() throws Exception {
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (int i=0; i < flowFiles.size(); i++) {
flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson());
}
}
@Test
public void testProjection() throws Exception {
runner.setProperty(GetMongo.QUERY, "{\"a\": 1, \"b\": 3}");
runner.setProperty(GetMongo.PROJECTION, "{\"_id\": 0, \"a\": 1}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Document expected = new Document("a", 1);
flowFiles.get(0).assertContentEquals(expected.toJson());
}
@Test
public void testSort() throws Exception {
runner.setVariable("sort", "{a: -1, b: -1, c: 1}");
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
runner.setProperty(GetMongo.SORT, "${sort}");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson());
flowFiles.get(1).assertContentEquals(DOCUMENTS.get(0).toJson());
flowFiles.get(2).assertContentEquals(DOCUMENTS.get(1).toJson());
}
@Test
public void testLimit() throws Exception {
runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}");
runner.setProperty(GetMongo.LIMIT, "${limit}");
runner.setVariable("limit", "1");
runner.run();
runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson());
}
@Test
public void testResultsPerFlowfile() throws Exception {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}");
runner.setVariable("results.per.flowfile", "2");
runner.enqueue("{}");
runner.setIncomingConnection(true);
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 2);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Assertions.assertTrue(results.get(0).getSize() > 0, "Flowfile was empty");
Assertions.assertEquals(results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json", "Wrong mime type");
}
@Test
public void testBatchSize() throws Exception {
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}");
runner.setVariable("batch.size", "1");
runner.enqueue("{}");
runner.setIncomingConnection(true);
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 2);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
Assertions.assertTrue(results.get(0).getSize() > 0, "Flowfile was empty");
Assertions.assertEquals(results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json", "Wrong mime type");
}
@Test
public void testConfigurablePrettyPrint() {
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.setProperty(GetMongo.LIMIT, "1");
runner.enqueue("{}");
runner.setIncomingConnection(true);
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
byte[] raw = runner.getContentAsByteArray(flowFiles.get(0));
String json = new String(raw);
Assertions.assertTrue(json.contains("\n"), "JSON did not have new lines.");
runner.clearTransferState();
runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP);
runner.enqueue("{}");
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
raw = runner.getContentAsByteArray(flowFiles.get(0));
json = new String(raw);
Assertions.assertFalse(json.contains("\n"), "New lines detected");
}
private void testQueryAttribute(String attr, String expected) {
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (MockFlowFile mff : flowFiles) {
String val = mff.getAttribute(attr);
Assertions.assertNotNull(val, "Missing query attribute");
Assertions.assertEquals(expected, val, "Value was wrong");
}
}
@Test
public void testQueryAttribute() {
/*
* Test original behavior; Manually set query of {}, no input
*/
final String attr = "query.attr";
final String queryValue = "{}";
runner.setProperty(GetMongo.QUERY, queryValue);
runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr);
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
testQueryAttribute(attr, queryValue);
runner.clearTransferState();
/*
* Test original behavior; No Input/Empty val = {}
*/
runner.removeProperty(GetMongo.QUERY);
runner.setIncomingConnection(false);
runner.run();
testQueryAttribute(attr, queryValue);
runner.clearTransferState();
/*
* Input flowfile with {} as the query
*/
runner.setIncomingConnection(true);
runner.enqueue("{}");
runner.run();
testQueryAttribute(attr, queryValue);
/*
* Input flowfile with invalid query
*/
runner.clearTransferState();
runner.enqueue("invalid query");
runner.run();
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetMongo.REL_FAILURE, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
}
/*
* Query read behavior tests
*/
@Test
public void testReadQueryFromBodyWithEL() {
Map attributes = new HashMap();
attributes.put("field", "c");
attributes.put("value", "4");
String query = "{ \"${field}\": { \"$gte\": ${value}}}";
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, query);
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "10");
runner.enqueue("test", attributes);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testReadQueryFromBodyNoEL() {
String query = "{ \"c\": { \"$gte\": 4 }}";
runner.setIncomingConnection(true);
runner.removeProperty(GetMongo.QUERY);
runner.enqueue(query);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testReadQueryFromQueryParamNoConnection() {
String query = "{ \"c\": { \"$gte\": 4 }}";
runner.setProperty(GetMongo.QUERY, query);
runner.setIncomingConnection(false);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testReadQueryFromQueryParamWithConnection() {
String query = "{ \"c\": { \"$gte\": ${value} }}";
Map<String, String> attrs = new HashMap<>();
attrs.put("value", "4");
runner.setProperty(GetMongo.QUERY, query);
runner.setIncomingConnection(true);
runner.enqueue("test", attrs);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testQueryParamMissingWithNoFlowfile() {
Exception ex = null;
try {
runner.assertValid();
runner.setIncomingConnection(false);
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "1");
runner.run(1, true, true);
} catch (Exception pe) {
ex = pe;
}
Assertions.assertNull(ex, "An exception was thrown!");
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
}
@Test
public void testReadCharsetWithEL() {
String query = "{ \"c\": { \"$gte\": 4 }}";
Map<String, String> attrs = new HashMap<>();
attrs.put("charset", "UTF-8");
runner.setProperty(GetMongo.CHARSET, "${charset}");
runner.setProperty(GetMongo.BATCH_SIZE, "2");
runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2");
runner.setIncomingConnection(true);
runner.enqueue(query, attrs);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
}
@Test
public void testKeepOriginalAttributes() {
final String query = "{ \"c\": { \"$gte\": 4 }}";
final Map<String, String> attributesMap = new HashMap<>(1);
attributesMap.put("property.1", "value-1");
runner.setIncomingConnection(true);
runner.removeProperty(GetMongo.QUERY);
runner.enqueue(query, attributesMap);
runner.run(1, true, true);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0);
Assertions.assertTrue(flowFile.getAttributes().containsKey("property.1"));
flowFile.assertAttributeEquals("property.1", "value-1");
}
/*
* End query read behavior tests
*/
/*
* Verify that behavior described in NIFI-5305 actually works. This test is to ensure that
* if a user configures the processor to use EL for the database details (name and collection) that
* it can work against a flowfile.
*/
@Test
public void testDatabaseEL() {
runner.clearTransferState();
runner.removeVariable("collection");
runner.removeVariable("db");
runner.setIncomingConnection(true);
String[] collections = new String[] { "a", "b", "c" };
String[] dbs = new String[] { "el_db_1", "el_db_2", "el_db_3" };
String query = "{}";
for (int x = 0; x < collections.length; x++) {
MongoDatabase db = mongoClient.getDatabase(dbs[x]);
db.getCollection(collections[x])
.insertOne(new Document().append("msg", "Hello, World"));
Map<String, String> attrs = new HashMap<>();
attrs.put("db", dbs[x]);
attrs.put("collection", collections[x]);
runner.enqueue(query, attrs);
runner.run();
db.drop();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.clearTransferState();
}
Map<String, Map<String, String>> vals = new HashMap<String, Map<String, String>>(){{
put("Collection", new HashMap<String, String>(){{
put("db", "getmongotest");
put("collection", "");
}});
put("Database", new HashMap<String, String>(){{
put("db", "");
put("collection", "test");
}});
}};
TestRunner tmpRunner;
for (Map.Entry<String, Map<String, String>> entry : vals.entrySet()) {
// Creating a new runner for each set of attributes map since every subsequent runs will attempt to take the top most enqueued FlowFile
tmpRunner = TestRunners.newTestRunner(GetMongo.class);
tmpRunner.setProperty(AbstractMongoProcessor.URI, MONGO_URI);
tmpRunner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME);
tmpRunner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME);
tmpRunner.setIncomingConnection(true);
tmpRunner.enqueue("{ }", entry.getValue());
try {
tmpRunner.run();
} catch (Throwable ex) {
Throwable cause = ex.getCause();
Assertions.assertTrue(cause instanceof ProcessException);
Assertions.assertTrue(ex.getMessage().contains(entry.getKey()), entry.getKey());
}
tmpRunner.clearTransferState();
}
}
@Test
public void testDBAttributes() {
runner.enqueue("{}");
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
for (MockFlowFile ff : ffs) {
String db = ff.getAttribute(GetMongo.DB_NAME);
String col = ff.getAttribute(GetMongo.COL_NAME);
Assertions.assertNotNull(db);
Assertions.assertNotNull(col);
Assertions.assertEquals(DB_NAME, db);
Assertions.assertEquals(COLLECTION_NAME, col);
}
}
@Test
public void testDateFormat() throws Exception {
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD);
runner.setProperty(GetMongo.DATE_FORMAT, "yyyy-MM-dd");
runner.enqueue("{ \"_id\": \"doc_2\" }");
runner.run();
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
MockFlowFile ff = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0);
byte[] content = runner.getContentAsByteArray(ff);
String json = new String(content);
Map<String, Object> result = new ObjectMapper().readValue(json, Map.class);
Pattern format = Pattern.compile("([\\d]{4})-([\\d]{2})-([\\d]{2})");
Assertions.assertTrue(result.containsKey("date_field"));
Assertions.assertTrue(format.matcher((String) result.get("date_field")).matches());
}
@Test
public void testClientService() throws Exception {
MongoDBClientService clientService = new MongoDBControllerService();
runner.addControllerService("clientService", clientService);
runner.removeProperty(GetMongo.URI);
runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
runner.setProperty(GetMongo.CLIENT_SERVICE, "clientService");
runner.enableControllerService(clientService);
runner.assertValid();
runner.enqueue("{}");
runner.run();
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
}
@Test
public void testInvalidQueryGoesToFailure() {
//Test variable registry mode
runner.setVariable("badattribute", "<<?>>");
runner.setProperty(GetMongo.QUERY, "${badattribute}");
runner.run();
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.clearTransferState();
//Test that it doesn't blow up with variable registry values holding a proper value
runner.setVariable("badattribute", "{}");
runner.run();
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.clearTransferState();
//Test a bad flowfile attribute
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, "${badfromff}");
runner.enqueue("<<?>>", new HashMap<String, String>() {{
put("badfromff", "{\"prop\":}");
}});
runner.run();
runner.assertTransferCount(GetMongo.REL_FAILURE, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
runner.clearTransferState();
//Test for regression on a good query from a flowfile attribute
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.QUERY, "${badfromff}");
runner.enqueue("<<?>>", new HashMap<String, String>() {{
put("badfromff", "{}");
}});
runner.run();
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 3);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.clearTransferState();
runner.removeProperty(GetMongo.QUERY);
//Test for regression against the body w/out any EL involved.
runner.enqueue("<<?>>");
runner.run();
runner.assertTransferCount(GetMongo.REL_FAILURE, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 0);
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0);
}
public void testSendEmpty() throws Exception {
runner.setIncomingConnection(true);
runner.setProperty(GetMongo.SEND_EMPTY_RESULTS, "true");
runner.setProperty(GetMongo.QUERY, "{ \"nothing\": true }");
runner.assertValid();
runner.enqueue("");
runner.run();
runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1);
runner.assertTransferCount(GetMongo.REL_SUCCESS, 1);
runner.assertTransferCount(GetMongo.REL_FAILURE, 0);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS);
MockFlowFile flowFile = flowFiles.get(0);
Assertions.assertEquals(0, flowFile.getSize());
}
}