blob: 9b458a228d3a9fb15db07acea75fb14348da3a82 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.standard;
import org.apache.nifi.avro.AvroReader;
import org.apache.nifi.avro.AvroReaderWithEmbeddedSchema;
import org.apache.nifi.avro.AvroRecordSetWriter;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVRecordSetWriter;
import org.apache.nifi.json.JsonRecordSetWriter;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.nifi.xml.XMLReader;
import org.apache.nifi.xml.XMLRecordSetWriter;
import org.junit.Before;
import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Consumer;
import static org.junit.Assert.assertEquals;
public abstract class AbstractConversionIT {
protected RecordReaderFactory reader;
protected Consumer<TestRunner> inputHandler;
protected Consumer<TestRunner> readerConfigurer;
protected RecordSetWriterFactory writer;
protected Consumer<MockFlowFile> resultHandler;
protected Consumer<TestRunner> writerConfigurer;
@Before
public void setUp() throws Exception {
reader = null;
inputHandler = null;
readerConfigurer = null;
writer = null;
resultHandler = null;
writerConfigurer = null;
}
@Test
public void testCsvToJson() throws Exception {
fromCsv(csvPostfix());
toJson(jsonPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testCsvToAvro() throws Exception {
fromCsv(csvPostfix());
toAvro(avroPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testCsvToAvroToCsv() throws Exception {
fromCsv(csvPostfix());
AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
AvroReader reader2 = new AvroReader();
toCsv(csvPostfix());
testChain(writer2, reader2);
}
@Test
public void testCsvToXml() throws Exception {
fromCsv(csvPostfix());
toXml(xmlPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testJsonToCsv() throws Exception {
fromJson(jsonPostfix());
toCsv(csvPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testJsonToAvro() throws Exception {
fromJson(jsonPostfix());
toAvro(avroPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testJsonToAvroToJson() throws Exception {
fromJson(jsonPostfix());
AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
AvroReader reader2 = new AvroReader();
toJson(jsonPostfix());
testChain(writer2, reader2);
}
@Test
public void testAvroToCsv() throws Exception {
fromAvro(avroPostfix());
toCsv(csvPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testAvroToJson() throws Exception {
fromAvro(avroPostfix());
toJson(jsonPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testAvroToXml() throws Exception {
fromAvro(avroPostfix());
toXml(xmlPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testXmlToCsv() throws Exception {
fromXml(xmlPostfix());
toCsv(csvPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testXmlToJson() throws Exception {
fromXml(xmlPostfix());
toJson(jsonPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testXmlToAvro() throws Exception {
fromXml(xmlPostfix());
toAvro(avroPostfix());
testConversion(reader, readerConfigurer, writer, writerConfigurer, inputHandler, resultHandler);
}
@Test
public void testXmlToAvroToXml() throws Exception {
fromXml(xmlPostfix());
AvroRecordSetWriter writer2 = new AvroRecordSetWriter();
AvroReader reader2 = new AvroReader();
toXml(xmlPostfix());
testChain(writer2, reader2);
}
abstract protected String csvPostfix();
abstract protected String jsonPostfix();
abstract protected String avroPostfix();
abstract protected String xmlPostfix();
protected void commonReaderConfiguration(TestRunner testRunner) {
}
protected void commonWriterConfiguration(TestRunner testRunner) {
}
protected void fromCsv(String postfix) {
reader = new CSVReader();
inputHandler = stringInputHandler(getContent(postfix));
readerConfigurer = testRunner -> {
commonReaderConfiguration(testRunner);
};
}
protected void fromJson(String postfix) {
reader = new JsonTreeReader();
inputHandler = stringInputHandler(getContent(postfix));
readerConfigurer = testRunner -> {
commonReaderConfiguration(testRunner);
};
}
protected void fromXml(String postfix) {
reader = new XMLReader();
inputHandler = stringInputHandler(getContent(postfix));
readerConfigurer = testRunner -> {
commonReaderConfiguration(testRunner);
testRunner.setProperty(reader, XMLReader.RECORD_FORMAT, XMLReader.RECORD_ARRAY);
};
}
protected void fromAvro(String postfix) {
reader = new AvroReader();
inputHandler = byteInputHandler(getByteContent(postfix));
readerConfigurer = testRunner -> {
commonReaderConfiguration(testRunner);
};
}
protected void toCsv(String postfix) {
writer = new CSVRecordSetWriter();
resultHandler = stringOutputHandler(getContent(postfix));
writerConfigurer = testRunner -> {
commonWriterConfiguration(testRunner);
};
}
protected void toJson(String postfix) {
writer = new JsonRecordSetWriter();
resultHandler = stringOutputHandler(getContent(postfix));
writerConfigurer = testRunner -> {
commonWriterConfiguration(testRunner);
testRunner.setProperty(writer, "Pretty Print JSON", "true");
};
}
protected void toXml(String postfix) {
writer = new XMLRecordSetWriter();
resultHandler = stringOutputHandler(getContent(postfix));
writerConfigurer = testRunner -> {
commonWriterConfiguration(testRunner);
testRunner.setProperty(writer, "pretty_print_xml", "true");
testRunner.setProperty(writer, "root_tag_name", "root");
testRunner.setProperty(writer, "record_tag_name", "nifiRecord");
};
}
protected void toAvro(String postfix) {
writer = new AvroRecordSetWriter();
resultHandler = mockFlowFile -> {
try {
List<Map<String, Object>> expected = getRecords(getByteContent(postfix));
List<Map<String, Object>> actual = getRecords(mockFlowFile.toByteArray());
assertEquals(expected, actual);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
writerConfigurer = testRunner -> {
commonWriterConfiguration(testRunner);
};
}
protected Consumer<TestRunner> stringInputHandler(String input) {
return testRunner -> testRunner.enqueue(input);
}
protected Consumer<TestRunner> byteInputHandler(byte[] input) {
return testRunner -> testRunner.enqueue(input);
}
protected Consumer<MockFlowFile> stringOutputHandler(String expected) {
return mockFlowFile -> mockFlowFile.assertContentEquals(expected);
}
protected String getContent(String postfix) {
return new String(getByteContent(postfix));
}
protected byte[] getByteContent(String postfix) {
try {
return Files.readAllBytes(Paths.get("src/test/resources/TestConversions/data.int_float_string." + postfix));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected List<Map<String, Object>> getRecords(byte[] avroData) throws IOException, MalformedRecordException {
try (RecordReader reader = new AvroReaderWithEmbeddedSchema(new ByteArrayInputStream(avroData));) {
return getRecords(reader);
}
}
protected List<Map<String, Object>> getRecords(RecordReader reader) throws IOException, MalformedRecordException {
List<Map<String, Object>> records = new ArrayList<>();
Record record;
while ((record = reader.nextRecord()) != null) {
records.add(record.toMap());
}
return records;
}
protected void testChain(RecordSetWriterFactory writer2, RecordReaderFactory reader2) throws InitializationException {
testConversion(reader, readerConfigurer, writer2, null,
inputHandler,
mockFlowFile -> {
try {
testConversion(reader2, null, writer, writerConfigurer,
testRunner -> testRunner.enqueue(mockFlowFile),
resultHandler
);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
protected <R extends RecordReaderFactory, W extends RecordSetWriterFactory> void testConversion(
R reader,
Consumer<TestRunner> readerConfigurer,
W writer,
Consumer<TestRunner> writerConfigurer,
Consumer<TestRunner> inputHandler,
Consumer<MockFlowFile> resultHandler
) throws InitializationException {
final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
String readerId = UUID.randomUUID().toString();
String writerId = UUID.randomUUID().toString();
runner.addControllerService(readerId, reader);
runner.addControllerService(writerId, writer);
Optional.ofNullable(readerConfigurer).ifPresent(_configurer -> _configurer.accept(runner));
Optional.ofNullable(writerConfigurer).ifPresent(_configurer -> _configurer.accept(runner));
runner.enableControllerService(reader);
runner.enableControllerService(writer);
runner.setProperty(ConvertRecord.RECORD_READER, readerId);
runner.setProperty(ConvertRecord.RECORD_WRITER, writerId);
inputHandler.accept(runner);
runner.run();
runner.assertAllFlowFilesTransferred(ConvertRecord.REL_SUCCESS, 1);
MockFlowFile flowFile = runner.getFlowFilesForRelationship(ConvertRecord.REL_SUCCESS).get(0);
resultHandler.accept(flowFile);
}
}