blob: 85aa0b6bba6763e2f221b4356294aef24f4df65f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.lookup.RecordLookupService;
import org.apache.nifi.lookup.StringLookupService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static org.junit.Assert.assertTrue;
public class TestLookupRecord {
private TestRunner runner;
private MapLookup lookupService;
private MockRecordParser recordReader;
private MockRecordWriter recordWriter;
@Before
public void setup() throws InitializationException {
recordReader = new MockRecordParser();
recordWriter = new MockRecordWriter(null, false);
lookupService = new MapLookup();
runner = TestRunners.newTestRunner(LookupRecord.class);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.addControllerService("writer", recordWriter);
runner.enableControllerService(recordWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, null);
recordReader.addRecord("Jane Doe", 47, null);
recordReader.addRecord("Jimmy Doe", 14, null);
}
@Test
public void testFlowfileAttributesPassed() {
Map<String, String> attrs = new HashMap<>();
attrs.put("schema.name", "person");
attrs.put("something_something", "test");
Map<String, Object> expected = new HashMap<>();
expected.putAll(attrs);
lookupService.setExpectedContext(expected);
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("", attrs);
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,Soccer\nJane Doe,47,Basketball\nJimmy Doe,14,Football\n");
}
@Test
public void testAllMatch() {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,Soccer\nJane Doe,47,Basketball\nJimmy Doe,14,Football\n");
}
@Test
public void testAllUnmatched() {
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testMixtureOfMatch() {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("");
runner.run();
runner.assertTransferCount(LookupRecord.REL_FAILURE, 0);
runner.assertTransferCount(LookupRecord.REL_MATCHED, 1);
runner.assertTransferCount(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile matched = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
matched.assertAttributeEquals("record.count", "2");
matched.assertAttributeEquals("mime.type", "text/plain");
matched.assertContentEquals("John Doe,48,Soccer\nJimmy Doe,14,Football\n");
final MockFlowFile unmatched = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
unmatched.assertAttributeEquals("record.count", "1");
unmatched.assertAttributeEquals("mime.type", "text/plain");
unmatched.assertContentEquals("Jane Doe,47,\n");
}
@Test
public void testResultPathNotFound() {
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/other");
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,,Soccer\nJane Doe,47,,Basketball\nJimmy Doe,14,,Football\n");
}
@Test
public void testLookupPathNotFound() {
runner.setProperty("lookup", "/other");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testUnparseableData() {
recordReader.failAfter(1);
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_FAILURE, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_FAILURE).get(0);
out.assertAttributeNotExists("record.count");
out.assertContentEquals("");
}
@Test
public void testNoResultPath() {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.removeProperty(LookupRecord.RESULT_RECORD_PATH);
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_MATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testMultipleLookupPaths() {
lookupService.addValue("John Doe", "Soccer");
lookupService.addValue("Jane Doe", "Basketball");
lookupService.addValue("Jimmy Doe", "Football");
runner.setProperty("lookup", "/*");
runner.enqueue("");
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED, 1);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_UNMATCHED).get(0);
out.assertAttributeEquals("record.count", "3");
out.assertAttributeEquals("mime.type", "text/plain");
out.assertContentEquals("John Doe,48,\nJane Doe,47,\nJimmy Doe,14,\n");
}
@Test
public void testInvalidUnlessAllRequiredPropertiesAdded() {
runner.removeProperty(new PropertyDescriptor.Builder().name("lookup").build());
runner.setProperty("hello", "/name");
runner.assertNotValid();
runner.setProperty("lookup", "xx");
runner.assertNotValid();
runner.setProperty("lookup", "/name");
runner.assertValid();
}
@Test
public void testAddFieldsToExistingRecord() throws InitializationException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("favorite", RecordFieldType.STRING);
recordReader.addSchemaField("least", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, null, "baseball");
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
out.assertContentEquals("John Doe,48,basketball,soccer\n");
}
/**
* If the output fields are added to a record that doesn't exist, the result should be that a Record is
* created and the results added to it.
*/
@Test
public void testAddFieldsToNonExistentRecord() throws InitializationException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.RECORD);
recordReader.addRecord("John Doe", 48, null);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
// We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
final String outputContents = new String(out.toByteArray());
assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
|| outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
}
/**
* If the output fields are added to a non-record field, then the result should be that the field
* becomes a UNION that does allow the Record and the value is set to a Record.
*/
@Test
public void testAddFieldsToNonRecordField() throws InitializationException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addSchemaField("sport", RecordFieldType.STRING);
recordReader.addRecord("John Doe", 48, null);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/sport");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_MATCHED).get(0);
// We can't be sure of the order of the fields in the record, so we allow either 'least' or 'favorite' to be first
final String outputContents = new String(out.toByteArray());
assertTrue(outputContents.equals("John Doe,48,MapRecord[{favorite=basketball, least=soccer}]\n")
|| outputContents.equals("John Doe,48,MapRecord[{least=soccer, favorite=basketball}]\n"));
}
@Test
public void testAddFieldsToExistingRecordRouteToSuccess() throws InitializationException {
final RecordLookup lookupService = new RecordLookup();
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
// Even if the looked up record's original schema is not nullable, the result record's enriched fields should be nullable.
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("favorite", RecordFieldType.STRING.getDataType(), false));
fields.add(new RecordField("least", RecordFieldType.STRING.getDataType(), true));
final RecordSchema schema = new SimpleRecordSchema(fields);
final Record sports = new MapRecord(schema, new HashMap<>());
sports.setValue("favorite", "basketball");
sports.setValue("least", "soccer");
lookupService.addValue("John Doe", sports);
// Incoming Record doesn't have the fields to be enriched.
recordReader = new MockRecordParser();
recordReader.addSchemaField("name", RecordFieldType.STRING);
recordReader.addSchemaField("age", RecordFieldType.INT);
recordReader.addRecord("John Doe", 48);
recordReader.addRecord("Jane Doe", 47);
runner.addControllerService("reader", recordReader);
runner.enableControllerService(recordReader);
runner.setProperty("lookup", "/name");
runner.setProperty(LookupRecord.RESULT_RECORD_PATH, "/");
runner.setProperty(LookupRecord.RESULT_CONTENTS, LookupRecord.RESULT_RECORD_FIELDS);
runner.enqueue("");
runner.run();
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
out.assertContentEquals("John Doe,48,soccer,basketball\nJane Doe,47\n");
}
@Test
public void testLookupArray() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookupForInPlaceReplacement();
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.addControllerService("writer", jsonWriter);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_SUCCESS);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupLanguage", "/locales[*]/language");
runner.setProperty("lookupRegion", "/locales[*]/region");
runner.setProperty("lookupFoo", "/foo/foo");
lookupService.addValue("FR", "France");
lookupService.addValue("CA", "Canada");
lookupService.addValue("fr", "French");
lookupService.addValue("key", "value");
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_SUCCESS);
final MockFlowFile out = runner.getFlowFilesForRelationship(LookupRecord.REL_SUCCESS).get(0);
out.assertContentEquals(new File("src/test/resources/TestLookupRecord/lookup-array-output.json").toPath());
}
@Test
public void testLookupArrayKeyNotInLRS() throws InitializationException, IOException {
TestRunner runner = TestRunners.newTestRunner(LookupRecord.class);
final MapLookup lookupService = new MapLookupForInPlaceReplacement();
final JsonTreeReader jsonReader = new JsonTreeReader();
runner.addControllerService("reader", jsonReader);
runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA);
final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
runner.addControllerService("writer", jsonWriter);
runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA);
runner.addControllerService("reader", jsonReader);
runner.enableControllerService(jsonReader);
runner.addControllerService("writer", jsonWriter);
runner.enableControllerService(jsonWriter);
runner.addControllerService("lookup", lookupService);
runner.enableControllerService(lookupService);
runner.setProperty(LookupRecord.ROUTING_STRATEGY, LookupRecord.ROUTE_TO_MATCHED_UNMATCHED);
runner.setProperty(LookupRecord.REPLACEMENT_STRATEGY, LookupRecord.REPLACE_EXISTING_VALUES);
runner.setProperty(LookupRecord.RECORD_READER, "reader");
runner.setProperty(LookupRecord.RECORD_WRITER, "writer");
runner.setProperty(LookupRecord.LOOKUP_SERVICE, "lookup");
runner.setProperty("lookupLanguage", "/locales[*]/language");
runner.setProperty("lookupRegion", "/locales[*]/region");
runner.setProperty("lookupFoo", "/foo/foo");
lookupService.addValue("FR", "France");
lookupService.addValue("CA", "Canada");
lookupService.addValue("fr", "French");
lookupService.addValue("badkey", "value");
runner.enqueue(new File("src/test/resources/TestLookupRecord/lookup-array-input.json").toPath());
runner.run();
runner.assertAllFlowFilesTransferred(LookupRecord.REL_UNMATCHED);
}
private static class MapLookup extends AbstractControllerService implements StringLookupService {
protected final Map<String, String> values = new HashMap<>();
private Map<String, Object> expectedContext;
public void addValue(final String key, final String value) {
values.put(key, value);
}
@Override
public Class<?> getValueType() {
return String.class;
}
@Override
public Optional<String> lookup(final Map<String, Object> coordinates, Map<String, String> context) {
validateContext(context);
return lookup(coordinates);
}
@Override
public Optional<String> lookup(final Map<String, Object> coordinates) {
if (coordinates == null || coordinates.get("lookup") == null) {
return Optional.empty();
}
final String key = (String)coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
return Optional.ofNullable(values.get(key));
}
@Override
public Set<String> getRequiredKeys() {
return Collections.singleton("lookup");
}
public void setExpectedContext(Map<String, Object> expectedContext) {
this.expectedContext = expectedContext;
}
private void validateContext(Map<String, String> context) {
if (expectedContext != null) {
for (Map.Entry<String, Object> entry : expectedContext.entrySet()) {
Assert.assertTrue(String.format("%s was not in coordinates.", entry.getKey()),
context.containsKey(entry.getKey()));
Assert.assertEquals("Wrong value", entry.getValue(), context.get(entry.getKey()));
}
}
}
}
private static class RecordLookup extends AbstractControllerService implements RecordLookupService {
private final Map<String, Record> values = new HashMap<>();
public void addValue(final String key, final Record value) {
values.put(key, value);
}
@Override
public Class<?> getValueType() {
return String.class;
}
@Override
public Optional<Record> lookup(final Map<String, Object> coordinates) {
if (coordinates == null || coordinates.get("lookup") == null) {
return Optional.empty();
}
final String key = (String)coordinates.get("lookup");
if (key == null) {
return Optional.empty();
}
return Optional.ofNullable(values.get(key));
}
@Override
public Set<String> getRequiredKeys() {
return Collections.singleton("lookup");
}
}
private static class MapLookupForInPlaceReplacement extends MapLookup implements StringLookupService {
@Override
public Optional<String> lookup(final Map<String, Object> coordinates) {
final String key = (String)coordinates.values().iterator().next();
if (key == null) {
return Optional.empty();
}
return Optional.ofNullable(values.get(key));
}
}
}