blob: 9da63958cd2129dd1211795d5e21e627cc54a8e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.nifi.processors.mongodb;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.client.MongoCollection;
import org.apache.nifi.mongodb.MongoDBClientService;
import org.apache.nifi.mongodb.MongoDBControllerService;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class RunMongoAggregationIT {
private static final String MONGO_URI = "mongodb://localhost";
private static final String DB_NAME = String.format("agg_test-%s", Calendar.getInstance().getTimeInMillis());
private static final String COLLECTION_NAME = "agg_test_data";
private static final String AGG_ATTR = "mongo.aggregation.query";
private TestRunner runner;
private MongoClient mongoClient;
private Map<String, Integer> mappings;
private Calendar now = Calendar.getInstance();
@BeforeEach
public void setup() {
runner = TestRunners.newTestRunner(RunMongoAggregation.class);
runner.setVariable("uri", MONGO_URI);
runner.setVariable("db", DB_NAME);
runner.setVariable("collection", COLLECTION_NAME);
runner.setProperty(AbstractMongoProcessor.URI, "${uri}");
runner.setProperty(AbstractMongoProcessor.DATABASE_NAME, "${db}");
runner.setProperty(AbstractMongoProcessor.COLLECTION_NAME, "${collection}");
runner.setProperty(RunMongoAggregation.QUERY_ATTRIBUTE, AGG_ATTR);
mongoClient = new MongoClient(new MongoClientURI(MONGO_URI));
MongoCollection<Document> collection = mongoClient.getDatabase(DB_NAME).getCollection(COLLECTION_NAME);
String[] values = new String[] { "a", "b", "c" };
mappings = new HashMap<>();
for (int x = 0; x < values.length; x++) {
for (int y = 0; y < x + 2; y++) {
Document doc = new Document().append("val", values[x]).append("date", now.getTime());
collection.insertOne(doc);
}
mappings.put(values[x], x + 2);
}
}
@AfterEach
public void teardown() {
runner = null;
mongoClient.getDatabase(DB_NAME).drop();
}
@Test
public void testAggregation() throws Exception {
final String queryInput = "[\n" +
" {\n" +
" \"$project\": {\n" +
" \"_id\": 0,\n" +
" \"val\": 1\n" +
" }\n" +
" },\n" +
" {\n" +
" \"$group\": {\n" +
" \"_id\": \"$val\",\n" +
" \"doc_count\": {\n" +
" \"$sum\": 1\n" +
" }\n" +
" }\n" +
" }\n" +
"]";
runner.setProperty(RunMongoAggregation.QUERY, queryInput);
runner.enqueue("test");
runner.run(1, true, true);
evaluateRunner(1);
runner.clearTransferState();
runner.setIncomingConnection(false);
runner.run(); //Null parent flowfile
evaluateRunner(0);
runner.run();
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
for (MockFlowFile mff : flowFiles) {
String val = mff.getAttribute(AGG_ATTR);
Assertions.assertNotNull("Missing query attribute", val);
Assertions.assertEquals(val, queryInput, "Value was wrong");
}
}
@Test
public void testExpressionLanguageSupport() throws Exception {
runner.setVariable("fieldName", "$val");
runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
" {\n" +
" \"$project\": {\n" +
" \"_id\": 0,\n" +
" \"val\": 1\n" +
" }\n" +
" },\n" +
" {\n" +
" \"$group\": {\n" +
" \"_id\": \"${fieldName}\",\n" +
" \"doc_count\": {\n" +
" \"$sum\": 1\n" +
" }\n" +
" }\n" +
" }\n" +
"]");
runner.enqueue("test");
runner.run(1, true, true);
evaluateRunner(1);
}
@Test
public void testInvalidQuery(){
runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
" {\n" +
" \"$invalid_stage\": {\n" +
" \"_id\": 0,\n" +
" \"val\": 1\n" +
" }\n" +
" }\n" +
"]"
);
runner.enqueue("test");
runner.run(1, true, true);
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 0);
runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, 0);
runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 1);
}
@Test
public void testJsonTypes() throws IOException {
runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_STANDARD);
runner.setProperty(RunMongoAggregation.QUERY, "[ { \"$project\": { \"myArray\": [ \"$val\", \"$date\" ] } } ]");
runner.enqueue("test");
runner.run(1, true, true);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
ObjectMapper mapper = new ObjectMapper();
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'");
for (MockFlowFile mockFlowFile : flowFiles) {
byte[] raw = runner.getContentAsByteArray(mockFlowFile);
Map<String, List<String>> read = mapper.readValue(raw, Map.class);
Assertions.assertTrue(read.get("myArray").get(1).equalsIgnoreCase( format.format(now.getTime())));
}
runner.clearTransferState();
runner.setProperty(RunMongoAggregation.JSON_TYPE, RunMongoAggregation.JSON_EXTENDED);
runner.enqueue("test");
runner.run(1, true, true);
flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
for (MockFlowFile mockFlowFile : flowFiles) {
byte[] raw = runner.getContentAsByteArray(mockFlowFile);
Map<String, List<Long>> read = mapper.readValue(raw, Map.class);
Assertions.assertTrue(read.get("myArray").get(1) == now.getTimeInMillis());
}
}
private void evaluateRunner(int original) throws IOException {
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size());
runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, original);
List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(RunMongoAggregation.REL_RESULTS);
ObjectMapper mapper = new ObjectMapper();
for (MockFlowFile mockFlowFile : flowFiles) {
byte[] raw = runner.getContentAsByteArray(mockFlowFile);
Map read = mapper.readValue(raw, Map.class);
Assertions.assertTrue(mappings.containsKey(read.get("_id")), "Value was not found");
String queryAttr = mockFlowFile.getAttribute(AGG_ATTR);
Assertions.assertNotNull("Query attribute was null.", queryAttr);
Assertions.assertTrue(queryAttr.contains("$project"), "Missing $project");
Assertions.assertTrue(queryAttr.contains("$group"), "Missing $group");
}
}
@Test
public void testClientService() throws Exception {
MongoDBClientService clientService = new MongoDBControllerService();
runner.addControllerService("clientService", clientService);
runner.removeProperty(RunMongoAggregation.URI);
runner.setProperty(clientService, MongoDBControllerService.URI, MONGO_URI);
runner.setProperty(RunMongoAggregation.CLIENT_SERVICE, "clientService");
runner.setProperty(RunMongoAggregation.QUERY, "[\n" +
" {\n" +
" \"$project\": {\n" +
" \"_id\": 0,\n" +
" \"val\": 1\n" +
" }\n" +
" }]");
runner.enableControllerService(clientService);
runner.assertValid();
runner.enqueue("{}");
runner.run();
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 9);
}
@Test
public void testExtendedJsonSupport() throws Exception {
String pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'";
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(pattern);
//Let's put this a week from now to make sure that we're not getting too close to
//the creation date
Date nowish = new Date(now.getTime().getTime() + (7 * 24 * 60 * 60 * 1000));
final String queryInput = "[\n" +
" {\n" +
" \"$match\": {\n" +
" \"date\": { \"$gte\": { \"$date\": \"2019-01-01T00:00:00Z\" }, \"$lte\": { \"$date\": \"" + simpleDateFormat.format(nowish) + "\" } }\n" +
" }\n" +
" },\n" +
" {\n" +
" \"$group\": {\n" +
" \"_id\": \"$val\",\n" +
" \"doc_count\": {\n" +
" \"$sum\": 1\n" +
" }\n" +
" }\n" +
" }\n" +
"]\n";
runner.setProperty(RunMongoAggregation.QUERY, queryInput);
runner.enqueue("test");
runner.run(1, true, true);
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, mappings.size());
}
@Test
public void testEmptyResponse() throws Exception {
final String queryInput = "[\n" +
" {\n" +
" \"$match\": {\n" +
" \"val\": \"no_exists\"\n" +
" }\n" +
" },\n" +
" {\n" +
" \"$group\": {\n" +
" \"_id\": \"null\",\n" +
" \"doc_count\": {\n" +
" \"$sum\": 1\n" +
" }\n" +
" }\n" +
" }\n" +
"]";
runner.setProperty(RunMongoAggregation.QUERY, queryInput);
runner.enqueue("test");
runner.run(1, true, true);
runner.assertTransferCount(RunMongoAggregation.REL_ORIGINAL, 1);
runner.assertTransferCount(RunMongoAggregation.REL_FAILURE, 0);
runner.assertTransferCount(RunMongoAggregation.REL_RESULTS, 1);
}
}