blob: ed772e5824f57cba9126f95f5e88cd6a6da3e359 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.parquet;
import org.apache.avro.generic.GenericRecord;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.util.MockComponentLog;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.io.InputFile;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TestParquetRecordSetWriter {
private static final String SCHEMA_PATH = "src/test/resources/avro/user.avsc";
private static final int USERS = 10;
private ComponentLog componentLog;
private ParquetRecordSetWriter recordSetWriterFactory;
@BeforeEach
public void setup() {
recordSetWriterFactory = new ParquetRecordSetWriter();
componentLog = new MockComponentLog("1234", recordSetWriterFactory);
}
@Test
public void testWriteUsers() throws IOException, SchemaNotFoundException, InitializationException {
initRecordSetWriter();
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
writeUsers(writeSchema, parquetFile);
verifyParquetRecords(parquetFile);
}
@Test
public void testWriteUsersWhenSchemaFormatNotAvro() throws IOException, SchemaNotFoundException, InitializationException {
initRecordSetWriter();
final RecordSchema writeSchema = recordSetWriterFactory.getSchema(Collections.emptyMap(), null);
final RecordSchema writeSchemaWithOtherFormat = new SimpleRecordSchema(writeSchema.getFields(), null, "OTHER-FORMAT", SchemaIdentifier.EMPTY);
final File parquetFile = new File("target/testWriterUsers-" + System.currentTimeMillis());
writeUsers(writeSchemaWithOtherFormat, parquetFile);
verifyParquetRecords(parquetFile);
}
private void initRecordSetWriter() throws IOException, InitializationException {
final TestRunner runner = TestRunners.newTestRunner(new AbstractProcessor() {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
}
});
runner.addControllerService("writer", recordSetWriterFactory);
final File schemaFile = new File(SCHEMA_PATH);
final Map<PropertyDescriptor, String> properties = createPropertiesWithSchema(schemaFile);
properties.forEach((k, v) -> runner.setProperty(recordSetWriterFactory, k, v));
runner.enableControllerService(recordSetWriterFactory);
}
private void writeUsers(final RecordSchema writeSchema, final File parquetFile) throws IOException {
try(final OutputStream output = new FileOutputStream(parquetFile);
final RecordSetWriter recordSetWriter = recordSetWriterFactory.createWriter(componentLog, writeSchema, output, Collections.emptyMap())) {
for (int i = 0; i < USERS; i++) {
final Map<String, Object> userFields = new HashMap<>();
userFields.put("name", "user" + i);
userFields.put("favorite_number", i);
userFields.put("favorite_color", "blue");
final Record userRecord = new MapRecord(writeSchema, userFields);
recordSetWriter.write(userRecord);
}
recordSetWriter.flush();
}
}
private void verifyParquetRecords(final File parquetFile) throws IOException {
final Configuration conf = new Configuration();
final Path path = new Path(parquetFile.getPath());
final InputFile inputFile = HadoopInputFile.fromPath(path, conf);
try (final ParquetReader<GenericRecord> reader =
AvroParquetReader.<GenericRecord>builder(inputFile).withConf(conf).build()){
int recordCount = 0;
while(reader.read() != null) {
recordCount++;
}
assertEquals(USERS, recordCount);
}
}
private Map<PropertyDescriptor,String> createPropertiesWithSchema(final File schemaFile) throws IOException {
return createPropertiesWithSchema(IOUtils.toString(schemaFile.toURI(), StandardCharsets.UTF_8));
}
private Map<PropertyDescriptor,String> createPropertiesWithSchema(final String schemaText) {
final Map<PropertyDescriptor,String> propertyValues = new HashMap<>();
propertyValues.put(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
propertyValues.put(SchemaAccessUtils.SCHEMA_TEXT, schemaText);
return propertyValues;
}
}