blob: 72d1e046cdc947b966d6de855236838b7045c0de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TestAvroReaderWithExplicitSchema {
@Test
public void testAvroExplicitReaderWithSchemalessFile() throws Exception {
File avroFileWithEmbeddedSchema = new File("src/test/resources/avro/avro_schemaless.avro");
FileInputStream fileInputStream = new FileInputStream(avroFileWithEmbeddedSchema);
Schema dataSchema = new Schema.Parser().parse(new File("src/test/resources/avro/avro_schemaless.avsc"));
RecordSchema recordSchema = new SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, null);
AvroReaderWithExplicitSchema avroReader = new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
GenericRecord record = avroReader.nextAvroRecord();
assertNotNull(record);
assertEquals(1, record.get("id"));
assertNotNull(record.get("key"));
assertEquals("value", record.get("key").toString());
record = avroReader.nextAvroRecord();
assertNotNull(record);
assertEquals(2, record.get("id"));
assertNull(record.get("key"));
record = avroReader.nextAvroRecord();
assertEquals(3, record.get("id"));
assertNotNull(record.get("key"));
assertEquals("hello", record.get("key").toString());
record = avroReader.nextAvroRecord();
assertNull(record);
}
@Test
public void testAvroExplicitReaderWithEmbeddedSchemaFile() throws Exception {
File avroFileWithEmbeddedSchema = new File("src/test/resources/avro/avro_embed_schema.avro");
FileInputStream fileInputStream = new FileInputStream(avroFileWithEmbeddedSchema);
Schema dataSchema = new Schema.Parser().parse(new File("src/test/resources/avro/avro_schemaless.avsc"));
RecordSchema recordSchema = new SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, null);
AvroReaderWithExplicitSchema avroReader = new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
GenericRecord record = avroReader.nextAvroRecord();
assertNotNull(record);
assertEquals(1, record.get("id"));
assertNotNull(record.get("key"));
assertEquals("value", record.get("key").toString());
record = avroReader.nextAvroRecord();
assertNotNull(record);
assertEquals(2, record.get("id"));
assertNull(record.get("key"));
record = avroReader.nextAvroRecord();
assertEquals(3, record.get("id"));
assertNotNull(record.get("key"));
assertEquals("hello", record.get("key").toString());
record = avroReader.nextAvroRecord();
assertNull(record);
}
@Test(expected = IOException.class)
public void testAvroExplicitReaderWithEmbeddedSchemaFileDifferentFromExplicitSchema() throws Exception {
File avroFileWithEmbeddedSchema = new File("src/test/resources/avro/avro_embed_schema.avro");
FileInputStream fileInputStream = new FileInputStream(avroFileWithEmbeddedSchema);
Schema dataSchema = new Schema.Parser().parse("{\"namespace\": \"nifi\",\"name\": \"test\",\"type\": \"record\",\"fields\": [{\"name\": \"id\",\"type\": \"int\"}]}");
RecordSchema recordSchema = new SimpleRecordSchema(dataSchema.toString(), AvroTypeUtil.AVRO_SCHEMA_FORMAT, null);
// Causes IOException in constructor due to schemas not matching
new AvroReaderWithExplicitSchema(fileInputStream, recordSchema, dataSchema);
}
@Test
public void testAvroExplicitReaderWithSchemalessFileDecimalValuesWithDifferentBufferSize() throws Exception {
// GIVEN
String avroFilePath = "src/test/resources/avro/avro_schemaless_decimal.avro";
String avroSchemaPath = "src/test/resources/avro/avro_schemaless_decimal.avsc";
List<Map<String, String>> expectedRecords = new ArrayList<>(4);
Map<String, String> record = new HashMap<>();
record.put("id", "AAAAAA");
record.put("price", "0.000000");
expectedRecords.add(record);
record = new HashMap<>();
record.put("id", "BBBBBB");
record.put("price", "15000.000000");
expectedRecords.add(record);
record = new HashMap<>();
record.put("id", "CCCCCC");
record.put("price", "0.000000");
expectedRecords.add(record);
record = new HashMap<>();
record.put("id", "DDDDDD");
record.put("price", "12.340000");
expectedRecords.add(record);
// WHEN
AvroReaderWithExplicitSchema avroReader = createAvroReaderWithExplicitSchema(avroFilePath, avroSchemaPath);
List<Record> actualRecords = collectRecords(avroReader);
// THEN
assertEquals(expectedRecords.size(), actualRecords.size());
for (int i = 0; i < expectedRecords.size(); ++i) {
Map<String, String> expectedRecord = expectedRecords.get(i);
Record actualRecord = actualRecords.get(i);
assertEquals(expectedRecord.get("id"), actualRecord.getValue("id").toString());
assertEquals(expectedRecord.get("price"), actualRecord.getValue("price").toString());
}
}
private AvroReaderWithExplicitSchema createAvroReaderWithExplicitSchema(String avroFilePath, String avroSchemaPath) throws IOException {
FileInputStream avroFileInputStream = new FileInputStream(avroFilePath);
Schema avroSchema = new Schema.Parser().parse(new FileInputStream(avroSchemaPath));
RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
return new AvroReaderWithExplicitSchema(avroFileInputStream, recordSchema, avroSchema);
}
private List<Record> collectRecords(RecordReader reader) throws IOException, MalformedRecordException {
List<Record> records = new ArrayList<>();
Record record = reader.nextRecord();
while (record != null) {
records.add(record);
record = reader.nextRecord();
}
return records;
}
}