blob: 3f773c8213b07aee25ae606a13505b74a3771cd2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.values;
import static org.apache.beam.sdk.schemas.Schema.toSchema;
import static org.apache.beam.sdk.values.Row.toRow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
/** Unit tests for {@link Row}. */
public class RowTest {
@Rule public ExpectedException thrown = ExpectedException.none();
@Test
public void testCreatesNullRecord() {
Schema type =
Stream.of(
Schema.Field.of("f_byte", FieldType.BYTE).withNullable(true),
Schema.Field.of("f_int16", FieldType.INT16).withNullable(true),
Schema.Field.of("f_int32", FieldType.INT32).withNullable(true),
Schema.Field.of("f_int64", FieldType.INT64).withNullable(true),
Schema.Field.of("f_decimal", FieldType.DECIMAL).withNullable(true),
Schema.Field.of("f_float", FieldType.FLOAT).withNullable(true),
Schema.Field.of("f_double", FieldType.DOUBLE).withNullable(true),
Schema.Field.of("f_string", FieldType.STRING).withNullable(true),
Schema.Field.of("f_datetime", FieldType.DATETIME).withNullable(true),
Schema.Field.of("f_boolean", FieldType.BOOLEAN).withNullable(true),
Schema.Field.of("f_array", FieldType.array(FieldType.DATETIME)).withNullable(true),
Schema.Field.of("f_map", FieldType.map(FieldType.INT32, FieldType.DOUBLE))
.withNullable(true))
.collect(toSchema());
Row row = Row.nullRow(type);
assertNull(row.getByte("f_byte"));
assertNull(row.getByte(0));
assertNull(row.getInt16("f_int16"));
assertNull(row.getInt16(1));
assertNull(row.getInt32("f_int32"));
assertNull(row.getInt32(2));
assertNull(row.getInt64("f_int64"));
assertNull(row.getInt64(3));
assertNull(row.getDecimal("f_decimal"));
assertNull(row.getDecimal(4));
assertNull(row.getFloat("f_float"));
assertNull(row.getFloat(5));
assertNull(row.getDouble("f_double"));
assertNull(row.getDouble(6));
assertNull(row.getString("f_string"));
assertNull(row.getString(7));
assertNull(row.getDateTime("f_datetime"));
assertNull(row.getDateTime(8));
assertNull(row.getBoolean("f_boolean"));
assertNull(row.getBoolean(9));
assertNull(row.getBoolean("f_array"));
assertNull(row.getBoolean(10));
assertNull(row.getBoolean("f_map"));
assertNull(row.getBoolean(11));
}
@Test
public void testRejectsNullRecord() {
Schema type = Stream.of(Schema.Field.of("f_int", Schema.FieldType.INT32)).collect(toSchema());
thrown.expect(IllegalArgumentException.class);
Row.nullRow(type);
}
@Test
public void testCreatesRecord() {
Schema schema =
Schema.builder()
.addByteField("f_byte")
.addInt16Field("f_int16")
.addInt32Field("f_int32")
.addInt64Field("f_int64")
.addDecimalField("f_decimal")
.addFloatField("f_float")
.addDoubleField("f_double")
.addStringField("f_string")
.addDateTimeField("f_datetime")
.addBooleanField("f_boolean")
.build();
DateTime dateTime =
new DateTime().withDate(1979, 03, 14).withTime(1, 2, 3, 4).withZone(DateTimeZone.UTC);
Row row =
Row.withSchema(schema)
.addValues(
(byte) 0, (short) 1, 2, 3L, new BigDecimal(2.3), 1.2f, 3.0d, "str", dateTime, false)
.build();
assertEquals((byte) 0, (Object) row.getByte("f_byte"));
assertEquals((byte) 0, (Object) row.getByte(0));
assertEquals((short) 1, (Object) row.getInt16("f_int16"));
assertEquals((short) 1, (Object) row.getInt16(1));
assertEquals((int) 2, (Object) row.getInt32("f_int32"));
assertEquals((int) 2, (Object) row.getInt32(2));
assertEquals((long) 3, (Object) row.getInt64("f_int64"));
assertEquals((long) 3, (Object) row.getInt64(3));
assertEquals(new BigDecimal(2.3), row.getDecimal("f_decimal"));
assertEquals(new BigDecimal(2.3), row.getDecimal(4));
assertEquals(1.2f, row.getFloat("f_float"), 0);
assertEquals(1.2f, row.getFloat(5), 0);
assertEquals(3.0d, row.getDouble("f_double"), 0);
assertEquals(3.0d, row.getDouble(6), 0);
assertEquals("str", row.getString("f_string"));
assertEquals("str", row.getString(7));
assertEquals(dateTime, row.getDateTime("f_datetime"));
assertEquals(dateTime, row.getDateTime(8));
assertEquals(false, row.getBoolean("f_boolean"));
assertEquals(false, row.getBoolean(9));
}
@Test
public void testCreatesNestedRow() {
Schema nestedType =
Stream.of(Schema.Field.of("f1_str", Schema.FieldType.STRING)).collect(toSchema());
Schema type =
Stream.of(
Schema.Field.of("f_int", Schema.FieldType.INT32),
Schema.Field.of("nested", Schema.FieldType.row(nestedType)))
.collect(toSchema());
Row nestedRow = Row.withSchema(nestedType).addValues("foobar").build();
Row row = Row.withSchema(type).addValues(42, nestedRow).build();
assertEquals((int) 42, (Object) row.getInt32("f_int"));
assertEquals("foobar", row.getRow("nested").getString("f1_str"));
}
@Test
public void testCreatesArray() {
List<Integer> data = Lists.newArrayList(2, 3, 5, 7);
Schema type =
Stream.of(Schema.Field.of("array", Schema.FieldType.array(Schema.FieldType.INT32)))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
}
@Test
public void testCreatesAndComparesNullArray() {
List<Integer> data = null;
Schema type =
Stream.of(Schema.Field.nullable("array", Schema.FieldType.array(Schema.FieldType.INT32)))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
Row otherNonNull = Row.withSchema(type).addValue(ImmutableList.of(1, 2, 3)).build();
Row otherNull = Row.withSchema(type).addValue(null).build();
assertNotEquals(otherNonNull, row);
assertEquals(otherNull, row);
}
@Test
public void testCreatesArrayWithNullElement() {
List<Integer> data = Lists.newArrayList(2, null, 5, null);
Schema type =
Stream.of(Schema.Field.of("array", Schema.FieldType.array(Schema.FieldType.INT32, true)))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
}
@Test
public void testCreatesRowArray() {
Schema nestedType = Stream.of(Schema.Field.of("f1_str", FieldType.STRING)).collect(toSchema());
List<Row> data =
Lists.newArrayList(
Row.withSchema(nestedType).addValues("one").build(),
Row.withSchema(nestedType).addValues("two").build(),
Row.withSchema(nestedType).addValues("three").build());
Schema type =
Stream.of(Schema.Field.of("array", FieldType.array(FieldType.row(nestedType))))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
}
@Test
public void testCreatesArrayArray() {
List<List<Integer>> data = Lists.<List<Integer>>newArrayList(Lists.newArrayList(1, 2, 3, 4));
Schema type =
Stream.of(Schema.Field.of("array", FieldType.array(FieldType.array(FieldType.INT32))))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
}
@Test
public void testCreatesArrayArrayWithNullElement() {
List<List<Integer>> data =
Lists.<List<Integer>>newArrayList(Lists.newArrayList(1, null, 3, null), null);
Schema type =
Stream.of(
Schema.Field.of(
"array", FieldType.array(FieldType.array(FieldType.INT32, true), true)))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
}
@Test
public void testCreatesArrayOfMap() {
List<Map<Integer, String>> data =
ImmutableList.<Map<Integer, String>>builder()
.add(ImmutableMap.of(1, "value1"))
.add(ImmutableMap.of(2, "value2"))
.build();
Schema type =
Stream.of(
Schema.Field.of(
"array", FieldType.array(FieldType.map(FieldType.INT32, FieldType.STRING))))
.collect(toSchema());
Row row = Row.withSchema(type).addArray(data).build();
assertEquals(data, row.getArray("array"));
}
@Test
public void testCreateMapWithPrimitiveValue() {
Map<Integer, String> data =
ImmutableMap.<Integer, String>builder()
.put(1, "value1")
.put(2, "value2")
.put(3, "value3")
.put(4, "value4")
.build();
Schema type =
Stream.of(Schema.Field.of("map", FieldType.map(FieldType.INT32, FieldType.STRING)))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
}
@Test
public void testCreateAndCompareNullMap() {
List<Integer> data = null;
Schema type =
Stream.of(Schema.Field.nullable("map", FieldType.map(FieldType.INT32, FieldType.STRING)))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getArray("map"));
Row otherNonNull = Row.withSchema(type).addValue(ImmutableMap.of(1, "value1")).build();
Row otherNull = Row.withSchema(type).addValue(null).build();
assertNotEquals(otherNonNull, row);
assertEquals(otherNull, row);
}
@Test
public void testCreateMapWithNullValue() {
Map<Integer, String> data = new HashMap();
data.put(1, "value1");
data.put(2, "value2");
data.put(3, null);
data.put(4, null);
Schema type =
Stream.of(Schema.Field.of("map", FieldType.map(FieldType.INT32, FieldType.STRING, true)))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
}
@Test
public void testCreateMapWithArrayValue() {
Map<Integer, List<String>> data =
ImmutableMap.<Integer, List<String>>builder()
.put(1, Arrays.asList("value1"))
.put(2, Arrays.asList("value2"))
.build();
Schema type =
Stream.of(
Schema.Field.of(
"map", FieldType.map(FieldType.INT32, FieldType.array(FieldType.STRING))))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
}
@Test
public void testCreateMapWithMapValue() {
Map<Integer, Map<Integer, String>> data =
ImmutableMap.<Integer, Map<Integer, String>>builder()
.put(1, ImmutableMap.of(1, "value1"))
.put(2, ImmutableMap.of(2, "value2"))
.build();
Schema type =
Stream.of(
Schema.Field.of(
"map",
FieldType.map(
FieldType.INT32, FieldType.map(FieldType.INT32, FieldType.STRING))))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
}
@Test
public void testCreateMapWithMapValueWithNull() {
Map<Integer, Map<Integer, String>> data = new HashMap();
Map<Integer, String> innerData = new HashMap();
innerData.put(11, null);
innerData.put(12, "value3");
data.put(1, ImmutableMap.of(1, "value1"));
data.put(2, ImmutableMap.of(2, "value2"));
data.put(3, null);
data.put(4, innerData);
Schema type =
Stream.of(
Schema.Field.of(
"map",
FieldType.map(
FieldType.INT32,
FieldType.map(FieldType.INT32, FieldType.STRING, true),
true)))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
}
@Test
public void testCreateMapWithRowValue() {
Schema nestedType = Stream.of(Schema.Field.of("f1_str", FieldType.STRING)).collect(toSchema());
Map<Integer, Row> data =
ImmutableMap.<Integer, Row>builder()
.put(1, Row.withSchema(nestedType).addValues("one").build())
.put(2, Row.withSchema(nestedType).addValues("two").build())
.build();
Schema type =
Stream.of(Schema.Field.of("map", FieldType.map(FieldType.INT32, FieldType.row(nestedType))))
.collect(toSchema());
Row row = Row.withSchema(type).addValue(data).build();
assertEquals(data, row.getMap("map"));
}
@Test
public void testCollector() {
Schema type =
Stream.of(
Schema.Field.of("f_int", FieldType.INT32),
Schema.Field.of("f_str", FieldType.STRING),
Schema.Field.of("f_double", FieldType.DOUBLE))
.collect(toSchema());
Row row = Stream.of(1, "2", 3.0d).collect(toRow(type));
assertEquals(1, row.<Object>getValue("f_int"));
assertEquals("2", row.getValue("f_str"));
assertEquals(3.0d, row.<Object>getValue("f_double"));
}
@Test
public void testThrowsForIncorrectNumberOfFields() {
Schema type =
Stream.of(
Schema.Field.of("f_int", FieldType.INT32),
Schema.Field.of("f_str", FieldType.STRING),
Schema.Field.of("f_double", FieldType.DOUBLE))
.collect(toSchema());
thrown.expect(IllegalArgumentException.class);
Row.withSchema(type).addValues(1, "2").build();
}
@Test
public void testByteArrayEquality() {
byte[] a0 = new byte[] {1, 2, 3, 4};
byte[] b0 = new byte[] {1, 2, 3, 4};
Schema schema = Schema.of(Schema.Field.of("bytes", Schema.FieldType.BYTES));
Row a = Row.withSchema(schema).addValue(a0).build();
Row b = Row.withSchema(schema).addValue(b0).build();
Assert.assertEquals(a, b);
}
@Test
public void testByteBufferEquality() {
byte[] a0 = new byte[] {1, 2, 3, 4};
byte[] b0 = new byte[] {1, 2, 3, 4};
Schema schema = Schema.of(Schema.Field.of("bytes", Schema.FieldType.BYTES));
Row a = Row.withSchema(schema).addValue(ByteBuffer.wrap(a0)).build();
Row b = Row.withSchema(schema).addValue(ByteBuffer.wrap(b0)).build();
Assert.assertEquals(a, b);
}
}