| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.nifi.processors.mongodb; |
| |
| import com.fasterxml.jackson.databind.ObjectMapper; |
| import com.google.common.collect.Lists; |
| import com.mongodb.MongoClient; |
| import com.mongodb.MongoClientURI; |
| import com.mongodb.client.MongoCollection; |
| import com.mongodb.client.MongoDatabase; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.mongodb.MongoDBClientService; |
| import org.apache.nifi.mongodb.MongoDBControllerService; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.util.MockFlowFile; |
| import org.apache.nifi.util.MockProcessContext; |
| import org.apache.nifi.util.TestRunner; |
| import org.apache.nifi.util.TestRunners; |
| import org.bson.Document; |
| import org.junit.jupiter.api.AfterEach; |
| import org.junit.jupiter.api.Assertions; |
| import org.junit.jupiter.api.BeforeEach; |
| import org.junit.jupiter.api.Test; |
| |
| import java.text.SimpleDateFormat; |
| import java.util.Calendar; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.regex.Pattern; |
| |
| public class GetMongoIT { |
| private static final String MONGO_URI = "mongodb://localhost"; |
| private static final String DB_NAME = GetMongoIT.class.getSimpleName().toLowerCase(); |
| private static final String COLLECTION_NAME = "test"; |
| |
| private static final List<Document> DOCUMENTS; |
| private static final Calendar CAL; |
| |
| static { |
| CAL = Calendar.getInstance(); |
| DOCUMENTS = Lists.newArrayList( |
| new Document("_id", "doc_1").append("a", 1).append("b", 2).append("c", 3), |
| new Document("_id", "doc_2").append("a", 1).append("b", 2).append("c", 4).append("date_field", CAL.getTime()), |
| new Document("_id", "doc_3").append("a", 1).append("b", 3) |
| ); |
| } |
| |
| private TestRunner runner; |
| private MongoClient mongoClient; |
| |
| @BeforeEach |
| public void setup() { |
| runner = TestRunners.newTestRunner(GetMongo.class); |
| runner.setVariable("uri", MONGO_URI); |
| runner.setVariable("db", DB_NAME); |
| runner.setVariable("collection", COLLECTION_NAME); |
| runner.setProperty(AbstractMongoProcessor.URI, "${uri}"); |
| runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}"); |
| runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}"); |
| runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.YES_PP); |
| runner.setIncomingConnection(false); |
| |
| mongoClient = new MongoClient(new MongoClientURI(MONGO_URI)); |
| |
| MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME); |
| collection.insertMany(DOCUMENTS); |
| } |
| |
| @AfterEach |
| public void teardown() { |
| runner = null; |
| |
| mongoClient.getDatabase(DB_NAME).drop(); |
| } |
| |
| @Test |
| public void testValidators() { |
| |
| TestRunner runner = TestRunners.newTestRunner(GetMongo.class); |
| Collection<ValidationResult> results; |
| ProcessContext pc; |
| |
| // missing uri, db, collection |
| runner.enqueue(new byte[0]); |
| pc = runner.getProcessContext(); |
| results = new HashSet<>(); |
| if (pc instanceof MockProcessContext) { |
| results = ((MockProcessContext) pc).validate(); |
| } |
| Assertions.assertEquals(2, results.size()); |
| Iterator<ValidationResult> it = results.iterator(); |
| Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Database Name is required")); |
| Assertions.assertTrue(it.next().toString().contains("is invalid because Mongo Collection Name is required")); |
| |
| // missing query - is ok |
| runner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); |
| runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); |
| runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); |
| runner.enqueue(new byte[0]); |
| pc = runner.getProcessContext(); |
| results = new HashSet<>(); |
| if (pc instanceof MockProcessContext) { |
| results = ((MockProcessContext) pc).validate(); |
| } |
| Assertions.assertEquals(0, results.size()); |
| |
| // invalid query |
| runner.setProperty(GetMongo.QUERY, "{a: x,y,z}"); |
| runner.enqueue(new byte[0]); |
| pc = runner.getProcessContext(); |
| results = new HashSet<>(); |
| if (pc instanceof MockProcessContext) { |
| results = ((MockProcessContext) pc).validate(); |
| } |
| Assertions.assertEquals(1, results.size()); |
| Assertions.assertTrue(results.iterator().next().toString().contains("is invalid because")); |
| |
| // invalid projection |
| runner.setVariable("projection", "{a: x,y,z}"); |
| runner.setProperty(GetMongo.QUERY, "{\"a\": 1}"); |
| runner.setProperty(GetMongo.PROJECTION, "{a: z}"); |
| runner.enqueue(new byte[0]); |
| pc = runner.getProcessContext(); |
| results = new HashSet<>(); |
| if (pc instanceof MockProcessContext) { |
| results = ((MockProcessContext) pc).validate(); |
| } |
| Assertions.assertEquals(1, results.size()); |
| Assertions.assertTrue(results.iterator().next().toString().contains("is invalid")); |
| |
| // invalid sort |
| runner.removeProperty(GetMongo.PROJECTION); |
| runner.setProperty(GetMongo.SORT, "{a: x,y,z}"); |
| runner.enqueue(new byte[0]); |
| pc = runner.getProcessContext(); |
| results = new HashSet<>(); |
| if (pc instanceof MockProcessContext) { |
| results = ((MockProcessContext) pc).validate(); |
| } |
| Assertions.assertEquals(1, results.size()); |
| Assertions.assertTrue(results.iterator().next().toString().contains("is invalid")); |
| } |
| |
| @Test |
| public void testCleanJson() throws Exception { |
| runner.setVariable("query", "{\"_id\": \"doc_2\"}"); |
| runner.setProperty(GetMongo.QUERY, "${query}"); |
| runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); |
| runner.run(); |
| |
| runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| byte[] raw = runner.getContentAsByteArray(flowFiles.get(0)); |
| ObjectMapper mapper = new ObjectMapper(); |
| Map<String, Object> parsed = mapper.readValue(raw, Map.class); |
| SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd"); |
| |
| Assertions.assertTrue(parsed.get("date_field").getClass() == String.class); |
| Assertions.assertTrue(((String)parsed.get("date_field")).startsWith(format.format(CAL.getTime()))); |
| } |
| |
| @Test |
| public void testReadOneDocument() throws Exception { |
| runner.setVariable("query", "{a: 1, b: 3}"); |
| runner.setProperty(GetMongo.QUERY, "${query}"); |
| runner.run(); |
| |
| runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson()); |
| } |
| |
| @Test |
| public void testReadMultipleDocuments() throws Exception { |
| runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}"); |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); |
| |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| for (int i=0; i < flowFiles.size(); i++) { |
| flowFiles.get(i).assertContentEquals(DOCUMENTS.get(i).toJson()); |
| } |
| } |
| |
| @Test |
| public void testProjection() throws Exception { |
| runner.setProperty(GetMongo.QUERY, "{\"a\": 1, \"b\": 3}"); |
| runner.setProperty(GetMongo.PROJECTION, "{\"_id\": 0, \"a\": 1}"); |
| runner.run(); |
| |
| runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| Document expected = new Document("a", 1); |
| flowFiles.get(0).assertContentEquals(expected.toJson()); |
| } |
| |
| @Test |
| public void testSort() throws Exception { |
| runner.setVariable("sort", "{a: -1, b: -1, c: 1}"); |
| runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}"); |
| runner.setProperty(GetMongo.SORT, "${sort}"); |
| runner.run(); |
| |
| runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 3); |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| flowFiles.get(0).assertContentEquals(DOCUMENTS.get(2).toJson()); |
| flowFiles.get(1).assertContentEquals(DOCUMENTS.get(0).toJson()); |
| flowFiles.get(2).assertContentEquals(DOCUMENTS.get(1).toJson()); |
| } |
| |
| @Test |
| public void testLimit() throws Exception { |
| runner.setProperty(GetMongo.QUERY, "{\"a\": {\"$exists\": \"true\"}}"); |
| runner.setProperty(GetMongo.LIMIT, "${limit}"); |
| runner.setVariable("limit", "1"); |
| runner.run(); |
| |
| runner.assertAllFlowFilesTransferred(GetMongo.REL_SUCCESS, 1); |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| flowFiles.get(0).assertContentEquals(DOCUMENTS.get(0).toJson()); |
| } |
| |
| @Test |
| public void testResultsPerFlowfile() throws Exception { |
| runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "${results.per.flowfile}"); |
| runner.setVariable("results.per.flowfile", "2"); |
| runner.enqueue("{}"); |
| runner.setIncomingConnection(true); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 2); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| Assertions.assertTrue(results.get(0).getSize() > 0, "Flowfile was empty"); |
| Assertions.assertEquals(results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json", "Wrong mime type"); |
| } |
| |
| @Test |
| public void testBatchSize() throws Exception { |
| runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); |
| runner.setProperty(GetMongo.BATCH_SIZE, "${batch.size}"); |
| runner.setVariable("batch.size", "1"); |
| runner.enqueue("{}"); |
| runner.setIncomingConnection(true); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 2); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| List<MockFlowFile> results = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| Assertions.assertTrue(results.get(0).getSize() > 0, "Flowfile was empty"); |
| Assertions.assertEquals(results.get(0).getAttribute(CoreAttributes.MIME_TYPE.key()), "application/json", "Wrong mime type"); |
| } |
| |
| @Test |
| public void testConfigurablePrettyPrint() { |
| runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); |
| runner.setProperty(GetMongo.LIMIT, "1"); |
| runner.enqueue("{}"); |
| runner.setIncomingConnection(true); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| byte[] raw = runner.getContentAsByteArray(flowFiles.get(0)); |
| String json = new String(raw); |
| Assertions.assertTrue(json.contains("\n"), "JSON did not have new lines."); |
| runner.clearTransferState(); |
| runner.setProperty(GetMongo.USE_PRETTY_PRINTING, GetMongo.NO_PP); |
| runner.enqueue("{}"); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| raw = runner.getContentAsByteArray(flowFiles.get(0)); |
| json = new String(raw); |
| Assertions.assertFalse(json.contains("\n"), "New lines detected"); |
| } |
| |
| private void testQueryAttribute(String attr, String expected) { |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| for (MockFlowFile mff : flowFiles) { |
| String val = mff.getAttribute(attr); |
| Assertions.assertNotNull(val, "Missing query attribute"); |
| Assertions.assertEquals(expected, val, "Value was wrong"); |
| } |
| } |
| |
| @Test |
| public void testQueryAttribute() { |
| /* |
| * Test original behavior; Manually set query of {}, no input |
| */ |
| final String attr = "query.attr"; |
| final String queryValue = "{}"; |
| runner.setProperty(GetMongo.QUERY, queryValue); |
| runner.setProperty(GetMongo.QUERY_ATTRIBUTE, attr); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); |
| testQueryAttribute(attr, queryValue); |
| |
| runner.clearTransferState(); |
| |
| /* |
| * Test original behavior; No Input/Empty val = {} |
| */ |
| runner.removeProperty(GetMongo.QUERY); |
| runner.setIncomingConnection(false); |
| runner.run(); |
| testQueryAttribute(attr, queryValue); |
| |
| runner.clearTransferState(); |
| |
| /* |
| * Input flowfile with {} as the query |
| */ |
| |
| runner.setIncomingConnection(true); |
| runner.enqueue("{}"); |
| runner.run(); |
| testQueryAttribute(attr, queryValue); |
| |
| /* |
| * Input flowfile with invalid query |
| */ |
| |
| runner.clearTransferState(); |
| runner.enqueue("invalid query"); |
| runner.run(); |
| |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 0); |
| } |
| |
| /* |
| * Query read behavior tests |
| */ |
| @Test |
| public void testReadQueryFromBodyWithEL() { |
| Map attributes = new HashMap(); |
| attributes.put("field", "c"); |
| attributes.put("value", "4"); |
| String query = "{ \"${field}\": { \"$gte\": ${value}}}"; |
| runner.setIncomingConnection(true); |
| runner.setProperty(GetMongo.QUERY, query); |
| runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "10"); |
| runner.enqueue("test", attributes); |
| runner.run(1, true, true); |
| |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| } |
| |
| @Test |
| public void testReadQueryFromBodyNoEL() { |
| String query = "{ \"c\": { \"$gte\": 4 }}"; |
| runner.setIncomingConnection(true); |
| runner.removeProperty(GetMongo.QUERY); |
| runner.enqueue(query); |
| runner.run(1, true, true); |
| |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| |
| } |
| |
| @Test |
| public void testReadQueryFromQueryParamNoConnection() { |
| String query = "{ \"c\": { \"$gte\": 4 }}"; |
| runner.setProperty(GetMongo.QUERY, query); |
| runner.setIncomingConnection(false); |
| runner.run(1, true, true); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| |
| } |
| |
| @Test |
| public void testReadQueryFromQueryParamWithConnection() { |
| String query = "{ \"c\": { \"$gte\": ${value} }}"; |
| Map<String, String> attrs = new HashMap<>(); |
| attrs.put("value", "4"); |
| |
| runner.setProperty(GetMongo.QUERY, query); |
| runner.setIncomingConnection(true); |
| runner.enqueue("test", attrs); |
| runner.run(1, true, true); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| } |
| |
| @Test |
| public void testQueryParamMissingWithNoFlowfile() { |
| Exception ex = null; |
| |
| try { |
| runner.assertValid(); |
| runner.setIncomingConnection(false); |
| runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "1"); |
| runner.run(1, true, true); |
| } catch (Exception pe) { |
| ex = pe; |
| } |
| |
| Assertions.assertNull(ex, "An exception was thrown!"); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); |
| } |
| |
| @Test |
| public void testReadCharsetWithEL() { |
| String query = "{ \"c\": { \"$gte\": 4 }}"; |
| Map<String, String> attrs = new HashMap<>(); |
| attrs.put("charset", "UTF-8"); |
| |
| runner.setProperty(GetMongo.CHARSET, "${charset}"); |
| runner.setProperty(GetMongo.BATCH_SIZE, "2"); |
| runner.setProperty(GetMongo.RESULTS_PER_FLOWFILE, "2"); |
| |
| runner.setIncomingConnection(true); |
| runner.enqueue(query, attrs); |
| runner.run(1, true, true); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| } |
| |
| @Test |
| public void testKeepOriginalAttributes() { |
| final String query = "{ \"c\": { \"$gte\": 4 }}"; |
| final Map<String, String> attributesMap = new HashMap<>(1); |
| attributesMap.put("property.1", "value-1"); |
| |
| runner.setIncomingConnection(true); |
| runner.removeProperty(GetMongo.QUERY); |
| runner.enqueue(query, attributesMap); |
| |
| runner.run(1, true, true); |
| |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| |
| MockFlowFile flowFile = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0); |
| Assertions.assertTrue(flowFile.getAttributes().containsKey("property.1")); |
| flowFile.assertAttributeEquals("property.1", "value-1"); |
| } |
| /* |
| * End query read behavior tests |
| */ |
| |
| /* |
| * Verify that behavior described in NIFI-5305 actually works. This test is to ensure that |
| * if a user configures the processor to use EL for the database details (name and collection) that |
| * it can work against a flowfile. |
| */ |
| @Test |
| public void testDatabaseEL() { |
| runner.clearTransferState(); |
| runner.removeVariable("collection"); |
| runner.removeVariable("db"); |
| runner.setIncomingConnection(true); |
| |
| String[] collections = new String[] { "a", "b", "c" }; |
| String[] dbs = new String[] { "el_db_1", "el_db_2", "el_db_3" }; |
| String query = "{}"; |
| |
| for (int x = 0; x < collections.length; x++) { |
| MongoDatabase db = mongoClient.getDatabase(dbs[x]); |
| db.getCollection(collections[x]) |
| .insertOne(new Document().append("msg", "Hello, World")); |
| |
| Map<String, String> attrs = new HashMap<>(); |
| attrs.put("db", dbs[x]); |
| attrs.put("collection", collections[x]); |
| runner.enqueue(query, attrs); |
| runner.run(); |
| |
| db.drop(); |
| |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.clearTransferState(); |
| } |
| |
| Map<String, Map<String, String>> vals = new HashMap<String, Map<String, String>>(){{ |
| put("Collection", new HashMap<String, String>(){{ |
| put("db", "getmongotest"); |
| put("collection", ""); |
| }}); |
| put("Database", new HashMap<String, String>(){{ |
| put("db", ""); |
| put("collection", "test"); |
| }}); |
| }}; |
| |
| TestRunner tmpRunner; |
| |
| for (Map.Entry<String, Map<String, String>> entry : vals.entrySet()) { |
| // Creating a new runner for each set of attributes map since every subsequent runs will attempt to take the top most enqueued FlowFile |
| tmpRunner = TestRunners.newTestRunner(GetMongo.class); |
| tmpRunner.setProperty(AbstractMongoProcessor.URI, MONGO_URI); |
| tmpRunner.setProperty(AbstractMongoProcessor.DATABASE_NAME, DB_NAME); |
| tmpRunner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, COLLECTION_NAME); |
| tmpRunner.setIncomingConnection(true); |
| |
| tmpRunner.enqueue("{ }", entry.getValue()); |
| |
| try { |
| tmpRunner.run(); |
| } catch (Throwable ex) { |
| Throwable cause = ex.getCause(); |
| Assertions.assertTrue(cause instanceof ProcessException); |
| Assertions.assertTrue(ex.getMessage().contains(entry.getKey()), entry.getKey()); |
| } |
| tmpRunner.clearTransferState(); |
| |
| } |
| } |
| |
| @Test |
| public void testDBAttributes() { |
| runner.enqueue("{}"); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); |
| List<MockFlowFile> ffs = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| for (MockFlowFile ff : ffs) { |
| String db = ff.getAttribute(GetMongo.DB_NAME); |
| String col = ff.getAttribute(GetMongo.COL_NAME); |
| Assertions.assertNotNull(db); |
| Assertions.assertNotNull(col); |
| Assertions.assertEquals(DB_NAME, db); |
| Assertions.assertEquals(COLLECTION_NAME, col); |
| } |
| } |
| |
| @Test |
| public void testDateFormat() throws Exception { |
| runner.setIncomingConnection(true); |
| runner.setProperty(GetMongo.JSON_TYPE, GetMongo.JSON_STANDARD); |
| runner.setProperty(GetMongo.DATE_FORMAT, "yyyy-MM-dd"); |
| runner.enqueue("{ \"_id\": \"doc_2\" }"); |
| runner.run(); |
| |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| MockFlowFile ff = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS).get(0); |
| byte[] content = runner.getContentAsByteArray(ff); |
| String json = new String(content); |
| Map<String, Object> result = new ObjectMapper().readValue(json, Map.class); |
| |
| Pattern format = Pattern.compile("([\\d]{4})-([\\d]{2})-([\\d]{2})"); |
| |
| Assertions.assertTrue(result.containsKey("date_field")); |
| Assertions.assertTrue(format.matcher((String) result.get("date_field")).matches()); |
| } |
| |
| @Test |
| public void testClientService() throws Exception { |
| MongoDBClientService clientService = new MongoDBControllerService(); |
| runner.addControllerService("clientService", clientService); |
| runner.removeProperty(GetMongo.URI); |
| runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI); |
| runner.setProperty(GetMongo.CLIENT_SERVICE, "clientService"); |
| runner.enableControllerService(clientService); |
| runner.assertValid(); |
| |
| runner.enqueue("{}"); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); |
| } |
| |
| @Test |
| public void testInvalidQueryGoesToFailure() { |
| //Test variable registry mode |
| runner.setVariable("badattribute", "<<?>>"); |
| runner.setProperty(GetMongo.QUERY, "${badattribute}"); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); |
| |
| runner.clearTransferState(); |
| |
| //Test that it doesn't blow up with variable registry values holding a proper value |
| runner.setVariable("badattribute", "{}"); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); |
| |
| runner.clearTransferState(); |
| |
| //Test a bad flowfile attribute |
| runner.setIncomingConnection(true); |
| runner.setProperty(GetMongo.QUERY, "${badfromff}"); |
| runner.enqueue("<<?>>", new HashMap<String, String>() {{ |
| put("badfromff", "{\"prop\":}"); |
| }}); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); |
| |
| runner.clearTransferState(); |
| |
| //Test for regression on a good query from a flowfile attribute |
| runner.setIncomingConnection(true); |
| runner.setProperty(GetMongo.QUERY, "${badfromff}"); |
| runner.enqueue("<<?>>", new HashMap<String, String>() {{ |
| put("badfromff", "{}"); |
| }}); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 3); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| |
| runner.clearTransferState(); |
| runner.removeProperty(GetMongo.QUERY); |
| |
| //Test for regression against the body w/out any EL involved. |
| runner.enqueue("<<?>>"); |
| runner.run(); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 0); |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 0); |
| } |
| |
| public void testSendEmpty() throws Exception { |
| runner.setIncomingConnection(true); |
| runner.setProperty(GetMongo.SEND_EMPTY_RESULTS, "true"); |
| runner.setProperty(GetMongo.QUERY, "{ \"nothing\": true }"); |
| runner.assertValid(); |
| runner.enqueue(""); |
| runner.run(); |
| |
| runner.assertTransferCount(GetMongo.REL_ORIGINAL, 1); |
| runner.assertTransferCount(GetMongo.REL_SUCCESS, 1); |
| runner.assertTransferCount(GetMongo.REL_FAILURE, 0); |
| |
| List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(GetMongo.REL_SUCCESS); |
| MockFlowFile flowFile = flowFiles.get(0); |
| Assertions.assertEquals(0, flowFile.getSize()); |
| } |
| } |