blob: 65b273624658474cbf3de56354ca7471839df0c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.repository.schema;
import org.junit.jupiter.api.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestSchemaRecordReader {
@Test
public void testReadExactlyOnceFields() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new SimpleRecordField("int", FieldType.INT, Repetition.EXACTLY_ONE));
fields.add(new SimpleRecordField("boolean", FieldType.BOOLEAN, Repetition.EXACTLY_ONE));
fields.add(new SimpleRecordField("byte array", FieldType.BYTE_ARRAY, Repetition.EXACTLY_ONE));
fields.add(new SimpleRecordField("long", FieldType.LONG, Repetition.EXACTLY_ONE));
fields.add(new SimpleRecordField("string", FieldType.STRING, Repetition.EXACTLY_ONE));
fields.add(new SimpleRecordField("long string", FieldType.LONG_STRING, Repetition.EXACTLY_ONE));
fields.add(new ComplexRecordField("complex", Repetition.EXACTLY_ONE,
new SimpleRecordField("key", FieldType.STRING, Repetition.EXACTLY_ONE),
new SimpleRecordField("value", FieldType.STRING, Repetition.EXACTLY_ONE)));
fields.add(new MapRecordField("map",
new SimpleRecordField("key", FieldType.STRING, Repetition.EXACTLY_ONE),
new SimpleRecordField("value", FieldType.STRING, Repetition.ZERO_OR_ONE), Repetition.EXACTLY_ONE));
fields.add(new UnionRecordField("union1", Repetition.EXACTLY_ONE, Arrays.asList(new RecordField[] {
new SimpleRecordField("one", FieldType.STRING, Repetition.EXACTLY_ONE),
new SimpleRecordField("two", FieldType.INT, Repetition.EXACTLY_ONE)
})));
fields.add(new UnionRecordField("union2", Repetition.EXACTLY_ONE, Arrays.asList(new RecordField[] {
new SimpleRecordField("one", FieldType.STRING, Repetition.EXACTLY_ONE),
new SimpleRecordField("two", FieldType.INT, Repetition.EXACTLY_ONE)
})));
final RecordSchema schema = new RecordSchema(fields);
final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache());
final byte[] buffer;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos)) {
dos.write(1); // sentinel byte
dos.writeInt(42);
dos.writeBoolean(true);
final byte[] array = "hello".getBytes();
dos.writeInt(array.length);
dos.write(array);
dos.writeLong(42L);
dos.writeUTF("hello");
final String longString = "hello";
final byte[] longStringArray = longString.getBytes(StandardCharsets.UTF_8);
dos.writeInt(longStringArray.length);
dos.write(longStringArray);
dos.writeUTF("key");
dos.writeUTF("value");
dos.writeInt(2);
dos.writeUTF("key1");
dos.writeBoolean(true);
dos.writeUTF("value1");
dos.writeUTF("key2");
dos.writeBoolean(false);
dos.writeUTF("one");
dos.writeUTF("hello");
dos.writeUTF("two");
dos.writeInt(42);
buffer = baos.toByteArray();
}
try (final ByteArrayInputStream in = new ByteArrayInputStream(buffer)) {
final Record record = reader.readRecord(in);
assertNotNull(record);
assertEquals(42, record.getFieldValue("int"));
assertTrue((boolean) record.getFieldValue("boolean"));
assertTrue(Arrays.equals("hello".getBytes(), (byte[]) record.getFieldValue("byte array")));
assertEquals(42L, record.getFieldValue("long"));
assertEquals("hello", record.getFieldValue("string"));
assertEquals("hello", record.getFieldValue("long string"));
final Record complexRecord = (Record) record.getFieldValue("complex");
assertEquals("key", complexRecord.getFieldValue(new SimpleRecordField("key", FieldType.STRING, Repetition.EXACTLY_ONE)));
assertEquals("value", complexRecord.getFieldValue(new SimpleRecordField("value", FieldType.STRING, Repetition.EXACTLY_ONE)));
final Map<String, String> map = new HashMap<>();
map.put("key1", "value1");
map.put("key2", null);
assertEquals(map, record.getFieldValue("map"));
assertEquals("hello", record.getFieldValue("union1"));
assertEquals(42, record.getFieldValue("union2"));
}
}
@Test
@SuppressWarnings("unchecked")
public void testReadZeroOrOneFields() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new SimpleRecordField("int", FieldType.INT, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("int present", FieldType.INT, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("boolean", FieldType.BOOLEAN, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("boolean present", FieldType.BOOLEAN, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("byte array", FieldType.BYTE_ARRAY, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("byte array present", FieldType.BYTE_ARRAY, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("long", FieldType.LONG, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("long present", FieldType.LONG, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("string", FieldType.STRING, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("string present", FieldType.STRING, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("long string", FieldType.LONG_STRING, Repetition.ZERO_OR_ONE));
fields.add(new SimpleRecordField("long string present", FieldType.LONG_STRING, Repetition.ZERO_OR_ONE));
fields.add(new ComplexRecordField("complex", Repetition.ZERO_OR_ONE,
new SimpleRecordField("key", FieldType.STRING, Repetition.ZERO_OR_ONE),
new SimpleRecordField("value", FieldType.STRING, Repetition.ZERO_OR_ONE)));
fields.add(new ComplexRecordField("complex present", Repetition.ZERO_OR_ONE,
new SimpleRecordField("key", FieldType.STRING, Repetition.ZERO_OR_ONE),
new SimpleRecordField("value", FieldType.STRING, Repetition.ZERO_OR_ONE)));
fields.add(new MapRecordField("map",
new SimpleRecordField("key", FieldType.STRING, Repetition.ZERO_OR_ONE),
new SimpleRecordField("value", FieldType.STRING, Repetition.ZERO_OR_MORE), Repetition.ZERO_OR_ONE));
fields.add(new MapRecordField("map present",
new SimpleRecordField("key", FieldType.STRING, Repetition.ZERO_OR_ONE),
new SimpleRecordField("value", FieldType.STRING, Repetition.ZERO_OR_MORE), Repetition.ZERO_OR_ONE));
fields.add(new UnionRecordField("union", Repetition.ZERO_OR_ONE, Arrays.asList(new RecordField[] {
new SimpleRecordField("one", FieldType.STRING, Repetition.EXACTLY_ONE),
new SimpleRecordField("two", FieldType.INT, Repetition.EXACTLY_ONE)
})));
fields.add(new UnionRecordField("union present", Repetition.ZERO_OR_ONE, Arrays.asList(new RecordField[] {
new SimpleRecordField("one", FieldType.STRING, Repetition.EXACTLY_ONE),
new SimpleRecordField("two", FieldType.INT, Repetition.ZERO_OR_MORE)
})));
final RecordSchema schema = new RecordSchema(fields);
final SchemaRecordReader reader = SchemaRecordReader.fromSchema(schema, new NoOpFieldCache());
// for each field, make the first one missing and the second one present.
final byte[] buffer;
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final DataOutputStream dos = new DataOutputStream(baos)) {
dos.write(1); // sentinel byte
dos.write(0);
dos.writeByte(1);
dos.writeInt(42);
dos.write(0);
dos.writeByte(1);
dos.writeBoolean(true);
final byte[] array = "hello".getBytes();
dos.write(0);
dos.writeByte(1);
dos.writeInt(array.length);
dos.write(array);
dos.write(0);
dos.writeByte(1);
dos.writeLong(42L);
dos.write(0);
dos.writeByte(1);
dos.writeUTF("hello");
final String longString = "hello";
final byte[] longStringArray = longString.getBytes(StandardCharsets.UTF_8);
dos.write(0);
dos.writeByte(1);
dos.writeInt(longStringArray.length);
dos.write(longStringArray);
dos.write(0);
dos.writeByte(1);
dos.writeByte(1);
dos.writeUTF("key");
dos.writeByte(0);
dos.writeBoolean(false); // map not present
dos.writeBoolean(true); // map present
dos.writeInt(2); // 2 entries in the map
dos.writeBoolean(true); // key present
dos.writeUTF("key1");
dos.writeInt(2); // 2 values
dos.writeUTF("one");
dos.writeUTF("two");
dos.writeBoolean(false); // key not present
dos.writeInt(1);
dos.writeUTF("three");
dos.writeBoolean(false);
dos.writeBoolean(true);
dos.writeUTF("two");
dos.writeInt(3); // 3 entries
dos.writeInt(1);
dos.writeInt(2);
dos.writeInt(3);
buffer = baos.toByteArray();
}
try (final ByteArrayInputStream in = new ByteArrayInputStream(buffer)) {
final Record record = reader.readRecord(in);
assertNotNull(record);
// Read everything into a map and make sure that no value is missing that has a name ending in " present"
final Map<String, Object> valueMap = new HashMap<>();
for (final RecordField field : record.getSchema().getFields()) {
final Object value = record.getFieldValue(field);
if (value == null) {
assertFalse(field.getFieldName().endsWith(" present"));
continue;
}
valueMap.put(field.getFieldName(), value);
}
assertEquals(42, valueMap.get("int present"));
assertTrue((boolean) valueMap.get("boolean present"));
assertTrue(Arrays.equals("hello".getBytes(), (byte[]) valueMap.get("byte array present")));
assertEquals(42L, valueMap.get("long present"));
assertEquals("hello", valueMap.get("string present"));
assertEquals("hello", valueMap.get("long string present"));
final Record complexRecord = (Record) valueMap.get("complex present");
assertEquals("key", complexRecord.getFieldValue(new SimpleRecordField("key", FieldType.STRING, Repetition.EXACTLY_ONE)));
assertNull(complexRecord.getFieldValue(new SimpleRecordField("value", FieldType.STRING, Repetition.EXACTLY_ONE)));
final Map<String, List<String>> map = (Map<String, List<String>>) valueMap.get("map present");
assertNotNull(map);
assertEquals(2, map.size());
assertTrue(map.containsKey(null));
assertTrue(map.containsKey("key1"));
final List<String> key1Values = Arrays.asList(new String[] {"one", "two"});
assertEquals(key1Values, map.get("key1"));
final List<String> nullKeyValues = Arrays.asList(new String[] {"three"});
assertEquals(nullKeyValues, map.get(null));
final List<Integer> unionValues = (List<Integer>) valueMap.get("union present");
assertEquals(3, unionValues.size());
assertEquals(1, unionValues.get(0).intValue());
assertEquals(2, unionValues.get(1).intValue());
assertEquals(3, unionValues.get(2).intValue());
}
}
}