blob: 0944061571ba324851a160246a1f3be095ddfa68 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.mongodb;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.lookup.LookupFailureException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.sql.Timestamp;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class MongoDBLookupServiceIT {
private static final String DB_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
private static final String COL_NAME = String.format("nifi_test-%d", Calendar.getInstance().getTimeInMillis());
private TestRunner runner;
private MongoDBLookupService service;
private MongoDBControllerService controllerService;
private MongoDatabase db;
private MongoCollection col;
@BeforeEach
public void before() throws Exception {
runner = TestRunners.newTestRunner(TestLookupServiceProcessor.class);
service = new MongoDBLookupService();
controllerService = new MongoDBControllerService();
runner.addControllerService("Client Service", service);
runner.addControllerService("Client Service 2", controllerService);
runner.setProperty(TestLookupServiceProcessor.CLIENT_SERVICE, "Client Service");
runner.setProperty(service, MongoDBLookupService.DATABASE_NAME, DB_NAME);
runner.setProperty(service, MongoDBLookupService.COLLECTION_NAME, COL_NAME);
runner.setProperty(controllerService, MongoDBControllerService.URI, "mongodb://localhost:27017");
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
runner.setProperty(service, MongoDBLookupService.CONTROLLER_SERVICE, "Client Service 2");
SchemaRegistry registry = new StubSchemaRegistry();
runner.addControllerService("registry", registry);
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
runner.setProperty(service, SchemaAccessUtils.SCHEMA_REGISTRY, "registry");
runner.enableControllerService(registry);
runner.enableControllerService(controllerService);
runner.enableControllerService(service);
db = controllerService.getDatabase(DB_NAME);
col = db.getCollection(COL_NAME);
}
@AfterEach
public void after() {
db.drop();
controllerService.onDisable();
}
@Test
public void testInit() {
runner.assertValid(service);
}
@Test
public void testLookupSingle() throws Exception {
runner.disableControllerService(service);
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "message");
runner.enableControllerService(service);
Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
col.insertOne(document);
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
Optional result = service.lookup(criteria);
Assertions.assertNotNull(result.get(), "The value was null.");
Assertions.assertEquals("Hello, world", result.get(), "The value was wrong.");
Map<String, Object> clean = new HashMap<>();
clean.putAll(criteria);
col.deleteOne(new Document(clean));
try {
result = service.lookup(criteria);
} catch (LookupFailureException ex) {
Assertions.fail();
}
Assertions.assertTrue(!result.isPresent());
}
@Test
public void testWithSchemaRegistry() throws Exception {
runner.assertValid();
col.insertOne(new Document()
.append("username", "john.smith")
.append("password", "testing1234")
);
Map<String, Object> criteria = new HashMap<>();
criteria.put("username", "john.smith");
Map<String, String> context = new HashMap<>();
context.put("schema.name", "user");
Optional result = service.lookup(criteria, context);
Assertions.assertTrue(result.isPresent());
Assertions.assertNotNull(result.get());
MapRecord record = (MapRecord)result.get();
Assertions.assertEquals("john.smith", record.getAsString("username"));
Assertions.assertEquals("testing1234", record.getAsString("password"));
/*
* Test falling back on schema detection if a user doesn't specify the context argument
*/
result = service.lookup(criteria);
Assertions.assertTrue(result.isPresent());
Assertions.assertNotNull(result.get());
record = (MapRecord)result.get();
Assertions.assertEquals("john.smith", record.getAsString("username"));
Assertions.assertEquals("testing1234", record.getAsString("password"));
}
@Test
public void testSchemaTextStrategy() throws Exception {
byte[] contents = IOUtils.toByteArray(getClass().getResourceAsStream("/simple.avsc"));
runner.disableControllerService(service);
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }");
runner.setProperty(service, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
runner.setProperty(service, SchemaAccessUtils.SCHEMA_TEXT, "${schema.text}");
runner.enableControllerService(service);
runner.assertValid();
col.insertOne(new Document().append("msg", "Testing1234"));
Map<String, Object> criteria = new HashMap<>();
criteria.put("msg", "Testing1234");
Map<String, String> attrs = new HashMap<>();
attrs.put("schema.text", new String(contents));
Optional results = service.lookup(criteria, attrs);
Assertions.assertNotNull(results);
Assertions.assertTrue(results.isPresent());
}
@Test
public void testLookupRecord() throws Exception {
runner.disableControllerService(service);
runner.setProperty(service, MongoDBLookupService.LOOKUP_VALUE_FIELD, "");
runner.setProperty(service, MongoDBLookupService.PROJECTION, "{ \"_id\": 0 }");
runner.enableControllerService(service);
Date d = new Date();
Timestamp ts = new Timestamp(new Date().getTime());
List list = Arrays.asList("a", "b", "c", "d", "e");
col.insertOne(new Document()
.append("uuid", "x-y-z")
.append("dateField", d)
.append("longField", 10000L)
.append("stringField", "Hello, world")
.append("timestampField", ts)
.append("decimalField", Double.MAX_VALUE / 2.0)
.append("subrecordField", new Document()
.append("nestedString", "test")
.append("nestedLong", new Long(1000)))
.append("arrayField", list)
);
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
Optional result = service.lookup(criteria);
Assertions.assertNotNull(result.get(), "The value was null.");
Assertions.assertTrue(result.get() instanceof MapRecord, "The value was wrong.");
MapRecord record = (MapRecord)result.get();
RecordSchema subSchema = ((RecordDataType)record.getSchema().getField("subrecordField").get().getDataType()).getChildSchema();
Assertions.assertEquals("Hello, world", record.getValue("stringField"), "The value was wrong.");
Assertions.assertEquals("x-y-z", record.getValue("uuid"), "The value was wrong.");
Assertions.assertEquals(new Long(10000), record.getValue("longField"));
Assertions.assertEquals((Double.MAX_VALUE / 2.0), record.getValue("decimalField"));
Assertions.assertEquals(d, record.getValue("dateField"));
Assertions.assertEquals(ts.getTime(), ((Date)record.getValue("timestampField")).getTime());
Record subRecord = record.getAsRecord("subrecordField", subSchema);
Assertions.assertNotNull(subRecord);
Assertions.assertEquals("test", subRecord.getValue("nestedString"));
Assertions.assertEquals(new Long(1000), subRecord.getValue("nestedLong"));
Assertions.assertEquals(list, record.getValue("arrayField"));
Map<String, Object> clean = new HashMap<>();
clean.putAll(criteria);
col.deleteOne(new Document(clean));
try {
result = service.lookup(criteria);
} catch (LookupFailureException ex) {
Assertions.fail();
}
Assertions.assertTrue(!result.isPresent());
}
@Test
public void testServiceParameters() {
Document document = controllerService.convertJson("{ \"uuid\": \"x-y-z\", \"message\": \"Hello, world\" }");
col.insertOne(document);
Map<String, Object> criteria = new HashMap<>();
criteria.put("uuid", "x-y-z");
assertDoesNotThrow(() -> service.lookup(criteria));
assertThrows(LookupFailureException.class, () -> service.lookup(new HashMap<>()));
}
}