blob: 109dc7dbcad40cf9cd6a6b3b06776456377c322a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.orc;
import org.apache.avro.Schema;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SystemUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcStruct;
import org.apache.hadoop.hive.ql.io.orc.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReader;
import org.apache.hadoop.hive.serde2.io.DateWritableV2;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritableV2;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Mockito;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
import java.sql.Timestamp;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import java.util.function.BiFunction;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
public class PutORCTest {
private static final String DIRECTORY = "target";
private static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
private Schema schema;
private Configuration testConf;
private PutORC proc;
private TestRunner testRunner;
@BeforeClass
public static void setupBeforeClass() {
Assume.assumeTrue("Test only runs on *nix", !SystemUtils.IS_OS_WINDOWS);
}
@Before
public void setup() throws IOException {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
testConf = new Configuration();
testConf.addResource(new Path(TEST_CONF_PATH));
proc = new PutORC();
}
private void configure(final PutORC putORC, final int numUsers) throws InitializationException {
configure(putORC, numUsers, null);
}
private void configure(final PutORC putORC, final int numUsers, final BiFunction<Integer, MockRecordParser, Void> recordGenerator) throws InitializationException {
testRunner = TestRunners.newTestRunner(putORC);
testRunner.setProperty(PutORC.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
testRunner.setProperty(PutORC.DIRECTORY, DIRECTORY);
MockRecordParser readerFactory = new MockRecordParser();
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField);
}
if (recordGenerator == null) {
for (int i = 0; i < numUsers; i++) {
readerFactory.addRecord("name" + i, i, "blue" + i, i * 10.0);
}
} else {
recordGenerator.apply(numUsers, readerFactory);
}
testRunner.addControllerService("mock-reader-factory", readerFactory);
testRunner.enableControllerService(readerFactory);
testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
}
@Test
public void testOverwriteFile() throws InitializationException {
configure(proc, 1);
final String filename = "testORCWithDefaults-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.setProperty(PutORC.OVERWRITE, "true");
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
MockRecordParser readerFactory = (MockRecordParser) testRunner.getControllerService("mock-reader-factory");
readerFactory.addRecord("name", 1, "blue", 10.0);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 2);
testRunner.setProperty(PutORC.OVERWRITE, "false");
readerFactory.addRecord("name", 1, "blue", 10.0);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertTransferCount(PutORC.REL_FAILURE, 1);
}
@Test
public void testWriteORCWithDefaults() throws IOException, InitializationException {
configure(proc, 100);
final String filename = "testORCWithDefaults-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.setProperty(PutORC.HIVE_TABLE_NAME, "myTable");
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
final Path orcFile = new Path(DIRECTORY + "/" + filename);
// verify the successful flow file has the expected attributes
final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutORC.REL_SUCCESS).get(0);
mockFlowFile.assertAttributeEquals(PutORC.ABSOLUTE_HDFS_PATH_ATTRIBUTE, orcFile.getParent().toString());
mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "100");
mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
"CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS ORC");
// verify we generated a provenance event
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
assertEquals(1, provEvents.size());
// verify it was a SEND event with the correct URI
final ProvenanceEventRecord provEvent = provEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename));
// verify the content of the ORC file by reading it back in
verifyORCUsers(orcFile, 100);
// verify we don't have the temp dot file after success
final File tempOrcFile = new File(DIRECTORY + "/." + filename);
Assert.assertFalse(tempOrcFile.exists());
// verify we DO have the CRC file after success
final File crcAvroORCFile = new File(DIRECTORY + "/." + filename + ".crc");
Assert.assertTrue(crcAvroORCFile.exists());
}
@Test
public void testWriteORCWithAvroLogicalTypes() throws IOException, InitializationException {
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/user_logical_types.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
LocalTime nowTime = LocalTime.now();
LocalDateTime nowDateTime = LocalDateTime.now();
LocalDate nowDate = LocalDate.now();
final int timeMillis = nowTime.get(ChronoField.MILLI_OF_DAY);
final Timestamp timestampMillis = Timestamp.valueOf(nowDateTime);
final Date dt = Date.valueOf(nowDate);
final BigDecimal bigDecimal = new BigDecimal("92.12");
configure(proc, 10, (numUsers, readerFactory) -> {
for (int i = 0; i < numUsers; i++) {
readerFactory.addRecord(
i,
timeMillis,
timestampMillis,
dt,
bigDecimal);
}
return null;
});
final String filename = "testORCWithDefaults-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.setProperty(PutORC.HIVE_TABLE_NAME, "myTable");
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
final Path orcFile = new Path(DIRECTORY + "/" + filename);
// verify the successful flow file has the expected attributes
final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutORC.REL_SUCCESS).get(0);
mockFlowFile.assertAttributeEquals(PutORC.ABSOLUTE_HDFS_PATH_ATTRIBUTE, orcFile.getParent().toString());
mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
mockFlowFile.assertAttributeEquals(PutORC.RECORD_COUNT_ATTR, "10");
// DDL will be created with field names normalized (lowercased, e.g.) for Hive by default
mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
"CREATE EXTERNAL TABLE IF NOT EXISTS `myTable` (`id` INT, `timemillis` INT, `timestampmillis` TIMESTAMP, `dt` DATE, `dec` DECIMAL) STORED AS ORC");
// verify we generated a provenance event
final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
assertEquals(1, provEvents.size());
// verify it was a SEND event with the correct URI
final ProvenanceEventRecord provEvent = provEvents.get(0);
assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
// If it runs with a real HDFS, the protocol will be "hdfs://", but with a local filesystem, just assert the filename.
Assert.assertTrue(provEvent.getTransitUri().endsWith(DIRECTORY + "/" + filename));
// verify the content of the ORC file by reading it back in
verifyORCUsers(orcFile, 10, (x, currUser) -> {
assertEquals((int) currUser, ((IntWritable) x.get(0)).get());
assertEquals(timeMillis, ((IntWritable) x.get(1)).get());
assertEquals(timestampMillis, ((TimestampWritableV2) x.get(2)).getTimestamp().toSqlTimestamp());
final DateFormat noTimeOfDayDateFormat = new SimpleDateFormat("yyyy-MM-dd");
noTimeOfDayDateFormat.setTimeZone(TimeZone.getTimeZone("gmt"));
assertEquals(noTimeOfDayDateFormat.format(dt), ((DateWritableV2) x.get(3)).get().toString());
assertEquals(bigDecimal, ((HiveDecimalWritable) x.get(4)).getHiveDecimal().bigDecimalValue());
return null;
}
);
// verify we don't have the temp dot file after success
final File tempOrcFile = new File(DIRECTORY + "/." + filename);
Assert.assertFalse(tempOrcFile.exists());
// verify we DO have the CRC file after success
final File crcAvroORCFile = new File(DIRECTORY + "/." + filename + ".crc");
Assert.assertTrue(crcAvroORCFile.exists());
}
@Test
public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException {
configure(proc, 10);
final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis();
// don't provide my.schema as an attribute
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
flowFileAttributes.put("my.schema", schema.toString());
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
}
@Test
public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
configure(proc, 10);
final org.apache.nifi.serialization.RecordReader recordReader = Mockito.mock(org.apache.nifi.serialization.RecordReader.class);
when(recordReader.nextRecord()).thenThrow(new MalformedRecordException("ERROR"));
final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
testRunner.addControllerService("mock-reader-factory", readerFactory);
testRunner.enableControllerService(readerFactory);
testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_FAILURE, 1);
}
@Test
public void testIOExceptionCreatingWriterShouldRouteToRetry() throws InitializationException {
final PutORC proc = new PutORC() {
@Override
public HDFSRecordWriter createHDFSRecordWriter(ProcessContext context, FlowFile flowFile, Configuration conf, Path path, RecordSchema schema)
throws IOException {
throw new IOException("IOException");
}
};
configure(proc, 0);
final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1);
}
@Test
public void testIOExceptionFromReaderShouldRouteToRetry() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
configure(proc, 10);
final RecordSet recordSet = Mockito.mock(RecordSet.class);
when(recordSet.next()).thenThrow(new IOException("ERROR"));
final org.apache.nifi.serialization.RecordReader recordReader = Mockito.mock(org.apache.nifi.serialization.RecordReader.class);
when(recordReader.createRecordSet()).thenReturn(recordSet);
when(recordReader.getSchema()).thenReturn(AvroTypeUtil.createSchema(schema));
final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
testRunner.addControllerService("mock-reader-factory", readerFactory);
testRunner.enableControllerService(readerFactory);
testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1);
}
@Test
public void testIOExceptionRenamingShouldRouteToRetry() throws InitializationException {
final PutORC proc = new PutORC() {
@Override
protected void rename(FileSystem fileSystem, Path srcFile, Path destFile)
throws IOException, InterruptedException, FailureException {
throw new IOException("IOException renaming");
}
};
configure(proc, 10);
final String filename = "testIOExceptionRenamingShouldRouteToRetry-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_RETRY, 1);
// verify we don't have the temp dot file after success
final File tempAvroORCFile = new File(DIRECTORY + "/." + filename);
Assert.assertFalse(tempAvroORCFile.exists());
}
@Test
public void testNestedRecords() throws Exception {
testRunner = TestRunners.newTestRunner(proc);
testRunner.setProperty(PutORC.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
testRunner.setProperty(PutORC.DIRECTORY, DIRECTORY);
MockRecordParser readerFactory = new MockRecordParser();
final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/nested_record.avsc"), StandardCharsets.UTF_8);
schema = new Schema.Parser().parse(avroSchema);
final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
for (final RecordField recordField : recordSchema.getFields()) {
readerFactory.addSchemaField(recordField);
}
Map<String,Object> nestedRecordMap = new HashMap<>();
nestedRecordMap.put("id", 11088000000001615L);
nestedRecordMap.put("x", "Hello World!");
RecordSchema nestedRecordSchema = AvroTypeUtil.createSchema(schema.getField("myField").schema());
MapRecord nestedRecord = new MapRecord(nestedRecordSchema, nestedRecordMap);
// This gets added in to its spot in the schema, which is already named "myField"
readerFactory.addRecord(nestedRecord);
testRunner.addControllerService("mock-reader-factory", readerFactory);
testRunner.enableControllerService(readerFactory);
testRunner.setProperty(PutORC.RECORD_READER, "mock-reader-factory");
testRunner.enqueue("trigger");
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
}
@Test
public void testDDLQuoteTableNameSections() throws IOException, InitializationException {
configure(proc, 100);
final String filename = "testORCWithDefaults-" + System.currentTimeMillis();
final Map<String, String> flowFileAttributes = new HashMap<>();
flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
testRunner.setProperty(PutORC.HIVE_TABLE_NAME, "mydb.myTable");
testRunner.enqueue("trigger", flowFileAttributes);
testRunner.run();
testRunner.assertAllFlowFilesTransferred(PutORC.REL_SUCCESS, 1);
final Path orcFile = new Path(DIRECTORY + "/" + filename);
// verify the successful flow file has the expected attributes
final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutORC.REL_SUCCESS).get(0);
mockFlowFile.assertAttributeEquals(PutORC.HIVE_DDL_ATTRIBUTE,
"CREATE EXTERNAL TABLE IF NOT EXISTS `mydb`.`myTable` (`name` STRING, `favorite_number` INT, `favorite_color` STRING, `scale` DOUBLE) STORED AS ORC");
}
private void verifyORCUsers(final Path orcUsers, final int numExpectedUsers) throws IOException {
verifyORCUsers(orcUsers, numExpectedUsers, null);
}
private void verifyORCUsers(final Path orcUsers, final int numExpectedUsers, BiFunction<List<Object>, Integer, Void> assertFunction) throws IOException {
Reader reader = OrcFile.createReader(orcUsers, OrcFile.readerOptions(testConf));
RecordReader recordReader = reader.rows();
TypeInfo typeInfo =
TypeInfoUtils.getTypeInfoFromTypeString("struct<name:string,favorite_number:int,favorite_color:string,scale:double>");
StructObjectInspector inspector = (StructObjectInspector)
OrcStruct.createObjectInspector(typeInfo);
int currUser = 0;
Object nextRecord = null;
while ((nextRecord = recordReader.next(nextRecord)) != null) {
Assert.assertNotNull(nextRecord);
Assert.assertTrue("Not an OrcStruct", nextRecord instanceof OrcStruct);
List<Object> x = inspector.getStructFieldsDataAsList(nextRecord);
if (assertFunction == null) {
assertEquals("name" + currUser, x.get(0).toString());
assertEquals(currUser, ((IntWritable) x.get(1)).get());
assertEquals("blue" + currUser, x.get(2).toString());
assertEquals(10.0 * currUser, ((DoubleWritable) x.get(3)).get(), Double.MIN_VALUE);
} else {
assertFunction.apply(x, currUser);
}
currUser++;
}
assertEquals(numExpectedUsers, currUser);
}
}