blob: d92aed34fbbfa3bbef7c9fecc2b495b5dfe8bdb0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License") you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.elasticsearch.integration
import org.apache.nifi.elasticsearch.ElasticSearchClientService
import org.apache.nifi.elasticsearch.ElasticSearchClientServiceImpl
import org.apache.nifi.elasticsearch.ElasticSearchLookupService
import org.apache.nifi.lookup.LookupFailureException
import org.apache.nifi.record.path.RecordPath
import org.apache.nifi.schema.access.SchemaAccessUtils
import org.apache.nifi.schemaregistry.services.SchemaRegistry
import org.apache.nifi.serialization.record.MapRecord
import org.apache.nifi.serialization.record.Record
import org.apache.nifi.serialization.record.RecordSchema
import org.apache.nifi.serialization.record.type.RecordDataType
import org.apache.nifi.util.TestRunner
import org.apache.nifi.util.TestRunners
import org.junit.Assert
import org.junit.Assume
import org.junit.Before
import org.junit.BeforeClass
import org.junit.Test
class ElasticSearchLookupService_IT {
private TestRunner runner
private ElasticSearchClientService service
private ElasticSearchLookupService lookupService
@BeforeClass
static void beforeAll() throws Exception {
Assume.assumeTrue("Elasticsearch integration-tests not setup", ElasticSearchClientService_IT.isElasticsearchSetup())
System.out.println(
String.format("%n%n%n%n%n%n%n%n%n%n%n%n%n%n%nTYPE: %s%nVERSION: %s%nFLAVOUR %s%n%n%n%n%n%n%n%n%n%n%n%n%n%n%n",
ElasticSearchClientService_IT.TYPE, ElasticSearchClientService_IT.VERSION, ElasticSearchClientService_IT.FLAVOUR)
)
}
@Before
void before() throws Exception {
runner = TestRunners.newTestRunner(TestControllerServiceProcessor.class)
service = new ElasticSearchClientServiceImpl()
lookupService = new ElasticSearchLookupService()
runner.addControllerService("Client Service", service)
runner.addControllerService("Lookup Service", lookupService)
runner.setProperty(service, ElasticSearchClientService.HTTP_HOSTS, "http://localhost:9400")
runner.setProperty(service, ElasticSearchClientService.CONNECT_TIMEOUT, "10000")
runner.setProperty(service, ElasticSearchClientService.SOCKET_TIMEOUT, "60000")
runner.setProperty(TestControllerServiceProcessor.CLIENT_SERVICE, "Client Service")
runner.setProperty(TestControllerServiceProcessor.LOOKUP_SERVICE, "Lookup Service")
runner.setProperty(lookupService, ElasticSearchLookupService.CLIENT_SERVICE, "Client Service")
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "user_details")
setTypeOnLookupService()
try {
runner.enableControllerService(service)
runner.enableControllerService(lookupService)
} catch (Exception ex) {
ex.printStackTrace()
throw ex
}
}
void setTypeOnLookupService() {
if (ElasticSearchClientService_IT.TYPE != null) {
runner.setProperty(lookupService, ElasticSearchLookupService.TYPE, ElasticSearchClientService_IT.TYPE)
} else {
runner.removeProperty(lookupService, ElasticSearchLookupService.TYPE)
}
}
@Test
void testValidity() throws Exception {
setDefaultSchema()
runner.assertValid()
}
private void setDefaultSchema() throws Exception {
runner.disableControllerService(lookupService)
SchemaRegistry registry = new TestSchemaRegistry()
runner.addControllerService("registry", registry)
runner.setProperty(lookupService, SchemaAccessUtils.SCHEMA_REGISTRY, "registry")
runner.enableControllerService(registry)
runner.enableControllerService(lookupService)
}
@Test
void lookupById() {
def coordinates = [ _id: "2" ]
Optional<Record> result = lookupService.lookup(coordinates)
Assert.assertNotNull(result)
Assert.assertTrue(result.isPresent())
def record = result.get()
Assert.assertEquals("jane.doe@company.com", record.getAsString("email"))
Assert.assertEquals("098-765-4321", record.getAsString("phone"))
Assert.assertEquals("GHIJK", record.getAsString("accessKey"))
}
@Test
void testInvalidIdScenarios() {
def coordinates = [
[
_id: 1
],
[
_id: "1", "email": "john.smith@company.com"
]
]
coordinates.each { coordinate ->
def exception = null
try {
lookupService.lookup(coordinate)
} catch (Exception ex) {
exception = ex
}
Assert.assertNotNull(exception)
Assert.assertTrue(exception instanceof LookupFailureException)
}
}
@Test
void lookupByQuery() {
def coordinates = [ "phone": "098-765-4321", "email": "jane.doe@company.com" ]
Optional<Record> result = lookupService.lookup(coordinates)
Assert.assertNotNull(result)
Assert.assertTrue(result.isPresent())
def record = result.get()
Assert.assertEquals("jane.doe@company.com", record.getAsString("email"))
Assert.assertEquals("098-765-4321", record.getAsString("phone"))
Assert.assertEquals("GHIJK", record.getAsString("accessKey"))
}
@Test
void testNestedSchema() {
def coordinates = [
"subField.deeper.deepest.super_secret": "The sky is blue",
"subField.deeper.secretz": "Buongiorno, mondo!!",
"msg": "Hello, world"
]
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
setTypeOnLookupService()
runner.enableControllerService(lookupService)
Optional<Record> response = lookupService.lookup(coordinates)
Assert.assertNotNull(response)
Assert.assertTrue(response.isPresent())
def rec = response.get()
Assert.assertEquals("Hello, world", rec.getValue("msg"))
def subRec = getSubRecord(rec, "subField")
Assert.assertNotNull(subRec)
def deeper = getSubRecord(subRec, "deeper")
Assert.assertNotNull(deeper)
def deepest = getSubRecord(deeper, "deepest")
Assert.assertNotNull(deepest)
Assert.assertEquals("The sky is blue", deepest.getAsString("super_secret"))
}
@Test
void testDetectedSchema() throws LookupFailureException {
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "complex")
setTypeOnLookupService()
runner.enableControllerService(lookupService)
def coordinates = ["_id": "1" ]
Optional<Record> response = lookupService.lookup(coordinates)
Assert.assertNotNull(response)
Record rec = response.get()
Record subRec = getSubRecord(rec, "subField")
def r2 = new MapRecord(rec.schema, [:])
def path = RecordPath.compile("/subField/longField")
def result = path.evaluate(r2)
result.selectedFields.findFirst().get().updateValue(1234567890L)
Assert.assertNotNull(rec)
Assert.assertNotNull(subRec)
Assert.assertEquals("Hello, world", rec.getValue("msg"))
Assert.assertNotNull(rec.getValue("subField"))
Assert.assertEquals(new Long(100000), subRec.getValue("longField"))
Assert.assertEquals("2018-04-10T12:18:05Z", subRec.getValue("dateField"))
}
static Record getSubRecord(Record rec, String fieldName) {
RecordSchema schema = rec.schema
RecordSchema subSchema = ((RecordDataType)schema.getField(fieldName).get().dataType).childSchema
rec.getAsRecord(fieldName, subSchema)
}
@Test
void testMappings() {
runner.disableControllerService(lookupService)
runner.setProperty(lookupService, "\$.subField.longField", "/longField2")
runner.setProperty(lookupService, '$.subField.dateField', '/dateField2')
runner.setProperty(lookupService, ElasticSearchLookupService.INDEX, "nested")
setTypeOnLookupService()
runner.enableControllerService(lookupService)
def coordinates = ["msg": "Hello, world"]
def result = lookupService.lookup(coordinates)
Assert.assertTrue(result.isPresent())
def rec = result.get()
["dateField2": "2018-08-14T10:08:00Z", "longField2": 150000L].each { field ->
def value = rec.getValue(field.key)
Assert.assertEquals(field.value, value)
}
}
}