blob: 3bb5475bd99e95f82a8d0a1e0a3553a8778b9426 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.parquet;
import org.apache.avro.Conversions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
import org.mockito.AdditionalMatchers;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.isNull;
import static org.mockito.Mockito.when;
@DisabledOnOs({ OS.WINDOWS })
public class FetchParquetTest {
static final String DIRECTORY = "target";
static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
static final String RECORD_HEADER = "name,favorite_number,favorite_color";
private static final int USERS = 10;
private Schema schema;
private Schema schemaWithArray;
private Schema schemaWithNullableArray;
private Schema schemaWithDecimal;
private Configuration testConf;
private FetchParquet proc;
private TestRunner testRunner;
@BeforeEach
public void setup() throws IOException {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
final String avroSchemaWithArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-array.avsc"), StandardCharsets.UTF_8);
schemaWithArray = new Schema.Parser().parse(avroSchemaWithArray);
final String avroSchemaWithNullableArray = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-nullable-array.avsc"), StandardCharsets.UTF_8);
schemaWithNullableArray = new Schema.Parser().parse(avroSchemaWithNullableArray);
final String avroSchemaWithDecimal = IOUtils.toString(new FileInputStream("src/test/resources/avro/user-with-fixed-decimal.avsc"), StandardCharsets.UTF_8);
schemaWithDecimal = new Schema.Parser().parse(avroSchemaWithDecimal);
testConf = new Configuration();
testConf.addResource(new Path(TEST_CONF_PATH));
proc = new FetchParquet();
}
private void configure(final FetchParquet fetchParquet) throws InitializationException {
testRunner = TestRunners.newTestRunner(fetchParquet);
testRunner.setProperty(FetchParquet.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
final RecordSetWriterFactory writerFactory = new MockRecordWriter(RECORD_HEADER, false);
testRunner.addControllerService("mock-writer-factory", writerFactory);
testRunner.enableControllerService(writerFactory);
testRunner.setProperty(FetchParquet.RECORD_WRITER, "mock-writer-factory");
}
@Test
public void testFetchParquetToCSV() throws IOException, InitializationException {
configure(proc);
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
writeParquetUsers(parquetFile);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, String.valueOf(USERS));
flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
// the mock record writer will write the header for each record so replace those to get down to just the records
String flowFileContent = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
verifyCSVRecords(flowFileContent);
}
@Test
public void testFetchWhenELEvaluatesToEmptyShouldRouteFailure() throws InitializationException {
configure(proc);
testRunner.setProperty(FetchParquet.FILENAME, "${missing.attr}");
testRunner.enqueue("TRIGGER");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_FAILURE, 1);
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_FAILURE).get(0);
flowFile.assertAttributeEquals(FetchParquet.FETCH_FAILURE_REASON_ATTR, "Can not create a Path from an empty string");
flowFile.assertContentEquals("TRIGGER");
}
@Test
public void testFetchWhenDoesNotExistShouldRouteToFailure() throws InitializationException {
configure(proc);
final String filename = "/tmp/does-not-exist-" + System.currentTimeMillis();
testRunner.setProperty(FetchParquet.FILENAME, filename);
testRunner.enqueue("TRIGGER");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_FAILURE, 1);
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_FAILURE).get(0);
flowFile.assertAttributeEquals(FetchParquet.FETCH_FAILURE_REASON_ATTR, "File " + filename + " does not exist");
flowFile.assertContentEquals("TRIGGER");
}
@Test
public void testIOExceptionCreatingReaderShouldRouteToRetry() throws InitializationException, IOException {
final FetchParquet proc = new FetchParquet() {
@Override
public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path)
throws IOException {
throw new IOException("IOException");
}
};
configure(proc);
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
writeParquetUsers(parquetFile);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1);
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0);
flowFile.assertContentEquals("TRIGGER");
}
@Test
public void testIOExceptionWhileReadingShouldRouteToRetry() throws IOException, InitializationException {
final FetchParquet proc = new FetchParquet() {
@Override
public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path) {
return new HDFSRecordReader() {
@Override
public Record nextRecord() throws IOException {
throw new IOException("IOException");
}
@Override
public void close() {
}
};
}
};
configure(proc);
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
writeParquetUsers(parquetFile);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1);
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0);
flowFile.assertContentEquals("TRIGGER");
}
@Test
public void testIOExceptionWhileWritingShouldRouteToRetry() throws InitializationException, IOException, SchemaNotFoundException {
configure(proc);
final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
when(recordSetWriter.write(any(Record.class))).thenThrow(new IOException("IOException"));
final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
when(recordSetWriterFactory.createWriter(any(ComponentLog.class), AdditionalMatchers.or(any(RecordSchema.class), isNull()), any(OutputStream.class), any(FlowFile.class)))
.thenReturn(recordSetWriter);
testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory);
testRunner.enableControllerService(recordSetWriterFactory);
testRunner.setProperty(FetchParquet.RECORD_WRITER, "mock-writer-factory");
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
writeParquetUsers(parquetFile);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1);
final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0);
flowFile.assertContentEquals("TRIGGER");
}
@Test
public void testFetchWithArray() throws InitializationException, IOException {
configure(proc);
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetWithArrayToCSV.parquet");
writeParquetUsersWithArray(parquetFile);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
}
@Test
public void testFetchWithNullableArray() throws InitializationException, IOException {
configure(proc);
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetWithNullableArrayToCSV.parquet");
writeParquetUsersWithNullableArray(parquetFile);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
}
@Test
public void testFetchParquetWithDecimal() throws InitializationException, IOException {
configure(proc);
final File parquetDir = new File(DIRECTORY);
final File parquetFile = new File(parquetDir,"testFetchParquetWithDecimal.parquet");
writeParquetUsersWithDecimal(parquetFile);
final Map<String,String> attributes = new HashMap<>();
attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
testRunner.enqueue("TRIGGER", attributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
}
protected void verifyCSVRecords(String csvContent) {
final String[] splits = csvContent.split("[\\n]");
assertEquals(USERS, splits.length);
for (int i=0; i < USERS; i++) {
final String line = splits[i];
assertEquals("Bob" + i + "," + i + ",blue" + i, line);
}
}
private AvroParquetWriter.Builder<GenericRecord> createAvroParquetWriter(final File parquetFile, final Schema schema) throws IOException {
final Path parquetPath = new Path(parquetFile.getPath());
return AvroParquetWriter
.<GenericRecord>builder(HadoopOutputFile.fromPath(parquetPath, testConf))
.withSchema(schema)
.withConf(testConf);
}
private void writeParquetUsers(final File parquetFile) throws IOException {
if (parquetFile.exists()) {
assertTrue(parquetFile.delete());
}
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schema);
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
for (int i=0; i < USERS; i++) {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", "Bob" + i);
user.put("favorite_number", i);
user.put("favorite_color", "blue" + i);
writer.write(user);
}
}
}
private void writeParquetUsersWithArray(final File parquetFile) throws IOException {
if (parquetFile.exists()) {
assertTrue(parquetFile.delete());
}
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithArray);
final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
for (int i=0; i < USERS; i++) {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", "Bob" + i);
user.put("favorite_number", i);
final GenericData.Array<String> colors = new GenericData.Array<>(1, favoriteColorsSchema);
colors.add("blue" + i);
user.put("favorite_color", colors);
writer.write(user);
}
}
}
private void writeParquetUsersWithNullableArray(final File parquetFile) throws IOException {
if (parquetFile.exists()) {
assertTrue(parquetFile.delete());
}
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithNullableArray);
// use the schemaWithArray here just to get the schema for the array part of the favorite_colors fields, the overall
// schemaWithNullableArray has a union of the array schema and null
final Schema favoriteColorsSchema = schemaWithArray.getField("favorite_colors").schema();
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
for (int i=0; i < USERS; i++) {
final GenericRecord user = new GenericData.Record(schema);
user.put("name", "Bob" + i);
user.put("favorite_number", i);
final GenericData.Array<String> colors = new GenericData.Array<>(1, favoriteColorsSchema);
colors.add("blue" + i);
user.put("favorite_color", colors);
writer.write(user);
}
}
}
private void writeParquetUsersWithDecimal(final File parquetFile) throws IOException {
if (parquetFile.exists()) {
assertTrue(parquetFile.delete());
}
final BigDecimal initialAmount = new BigDecimal("1234567.0123456789");
final AvroParquetWriter.Builder<GenericRecord> writerBuilder = createAvroParquetWriter(parquetFile, schemaWithDecimal);
final List<Schema> amountSchemaUnion = schemaWithDecimal.getField("amount").schema().getTypes();
final Schema amountSchema = amountSchemaUnion.stream().filter(s -> s.getType() == Schema.Type.FIXED).findFirst().orElse(null);
assertNotNull(amountSchema);
final Conversions.DecimalConversion decimalConversion = new Conversions.DecimalConversion();
try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
for (int i=0; i < USERS; i++) {
final BigDecimal incrementedAmount = initialAmount.add(new BigDecimal("1"));
final GenericRecord user = new GenericData.Record(schemaWithDecimal);
user.put("name", "Bob" + i);
user.put("amount", decimalConversion.toFixed(incrementedAmount, amountSchema, amountSchema.getLogicalType()));
writer.write(user);
}
}
}
}