blob: 8d7118017d64c3ae6c7cb0ebf4cab103aef303d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import com.google.auto.value.AutoValue;
import java.time.Instant;
import java.util.Collection;
import java.util.Objects;
import java.util.function.Supplier;
import org.apache.avro.reflect.AvroSchema;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.schemas.Schema.Field;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.logicaltypes.NanosInstant;
import org.apache.beam.sdk.schemas.utils.SchemaTestUtils;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.junit.runners.Parameterized;
/** Unit tests for {@link Schema}. */
@RunWith(Enclosed.class)
public class SchemaCoderTest {
private static final Schema INT32_SCHEMA =
Schema.builder().addInt32Field("a").addInt32Field("b").build();
public static final Schema LOGICAL_NANOS_SCHEMA =
Schema.of(Field.of("logicalNanos", FieldType.logicalType(new NanosInstant())));
public static final Schema FLOATING_POINT_SCHEMA =
Schema.of(Field.of("float", FieldType.FLOAT), Field.of("double", FieldType.DOUBLE));
@RunWith(JUnit4.class)
public static class SingletonTests {
@Test
public void equals_sameSchemaDifferentType_returnsFalse() throws NoSuchSchemaException {
SchemaCoder autovalueCoder = coderFrom(TypeDescriptor.of(SimpleAutoValue.class));
SchemaCoder javabeanCoder = coderFrom(TypeDescriptor.of(SimpleBean.class));
// These coders are *not* the same
assertNotEquals(autovalueCoder, javabeanCoder);
// These coders have equivalent schemas, but toRow and fromRow are not equal
SchemaTestUtils.assertSchemaEquivalent(autovalueCoder.getSchema(), javabeanCoder.getSchema());
assertNotEquals(autovalueCoder.getToRowFunction(), javabeanCoder.getToRowFunction());
assertNotEquals(autovalueCoder.getFromRowFunction(), javabeanCoder.getFromRowFunction());
}
}
@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract static class SimpleAutoValue {
public abstract String getString();
public abstract Integer getInt32();
public abstract Long getInt64();
public abstract DateTime getDatetime();
public static SimpleAutoValue of(String string, Integer int32, Long int64, DateTime datetime) {
return new AutoValue_SchemaCoderTest_SimpleAutoValue(string, int32, int64, datetime);
}
}
@DefaultSchema(JavaBeanSchema.class)
private static class SimpleBean {
private String string;
private Integer int32;
private Long int64;
private DateTime datetime;
public SimpleBean(String string, Integer int32, Long int64, DateTime datetime) {
this.string = string;
this.int32 = int32;
this.int64 = int64;
this.datetime = datetime;
}
public String getString() {
return string;
}
public void setString(String string) {
this.string = string;
}
public Integer getInt32() {
return int32;
}
public void setInt32(Integer int32) {
this.int32 = int32;
}
public Long getInt64() {
return int64;
}
public void setInt64(Long int64) {
this.int64 = int64;
}
public DateTime getDatetime() {
return datetime;
}
public void setDatetime(DateTime datetime) {
this.datetime = datetime;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SimpleBean that = (SimpleBean) o;
return string.equals(that.string)
&& int32.equals(that.int32)
&& int64.equals(that.int64)
&& datetime.equals(that.datetime);
}
@Override
public int hashCode() {
return Objects.hash(string, int32, int64, datetime);
}
}
@DefaultSchema(JavaFieldSchema.class)
private static class SimplePojo {
public String string;
public Integer int32;
public Long int64;
public DateTime datetime;
public SimplePojo(String string, Integer int32, Long int64, DateTime datetime) {
this.string = string;
this.int32 = int32;
this.int64 = int64;
this.datetime = datetime;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SimplePojo that = (SimplePojo) o;
return string.equals(that.string)
&& int32.equals(that.int32)
&& int64.equals(that.int64)
&& datetime.equals(that.datetime);
}
@Override
public int hashCode() {
return Objects.hash(string, int32, int64, datetime);
}
}
@DefaultSchema(AvroRecordSchema.class)
private static class SimpleAvro {
public String string;
public Integer int32;
public Long int64;
@AvroSchema("{\"type\": \"long\", \"logicalType\": \"timestamp-millis\"}")
public DateTime datetime;
public SimpleAvro(String string, Integer int32, Long int64, DateTime datetime) {
this.string = string;
this.int32 = int32;
this.int64 = int64;
this.datetime = datetime;
}
@Override
public boolean equals(@Nullable Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SimpleAvro that = (SimpleAvro) o;
return string.equals(that.string)
&& int32.equals(that.int32)
&& int64.equals(that.int64)
&& datetime.equals(that.datetime);
}
@Override
public int hashCode() {
return Objects.hash(string, int32, int64, datetime);
}
}
private static final SchemaRegistry REGISTRY = SchemaRegistry.createDefault();
private static SchemaCoder coderFrom(TypeDescriptor typeDescriptor) throws NoSuchSchemaException {
return SchemaCoder.of(
REGISTRY.getSchema(typeDescriptor),
typeDescriptor,
REGISTRY.getToRowFunction(typeDescriptor),
REGISTRY.getFromRowFunction(typeDescriptor));
}
@RunWith(Parameterized.class)
public static class ParameterizedTests {
@Parameterized.Parameter(0)
public SchemaCoder coder;
@Parameterized.Parameter(1)
public ImmutableList<Supplier<Object>> testValues;
@Parameterized.Parameter(2)
public boolean expectDeterministic;
@Parameterized.Parameters(name = "{index}: coder = {0}")
public static Collection<Object[]> data() throws NoSuchSchemaException {
return ImmutableList.of(
new Object[] {
SchemaCoder.of(INT32_SCHEMA),
ImmutableList.<Supplier<Object>>of(
() -> Row.withSchema(INT32_SCHEMA).addValues(9001, 9002).build(),
() -> Row.withSchema(INT32_SCHEMA).addValues(3, 4).build()),
true
},
new Object[] {
coderFrom(TypeDescriptor.of(SimpleAutoValue.class)),
ImmutableList.<Supplier<Object>>of(
() ->
SimpleAutoValue.of(
"foo",
9001,
0L,
new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
() ->
SimpleAutoValue.of(
"bar",
9002,
1L,
new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
true
},
new Object[] {
coderFrom(TypeDescriptor.of(SimpleBean.class)),
ImmutableList.<Supplier<Object>>of(
() ->
new SimpleBean(
"foo",
9001,
0L,
new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
() ->
new SimpleBean(
"bar",
9002,
1L,
new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
true
},
new Object[] {
coderFrom(TypeDescriptor.of(SimplePojo.class)),
ImmutableList.<Supplier<Object>>of(
() ->
new SimplePojo(
"foo",
9001,
0L,
new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
() ->
new SimplePojo(
"bar",
9002,
1L,
new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
true
},
new Object[] {
coderFrom(TypeDescriptor.of(SimpleAvro.class)),
ImmutableList.<Supplier<Object>>of(
() ->
new SimpleAvro(
"foo",
9001,
0L,
new DateTime().withDate(1979, 3, 14).withTime(10, 30, 0, 0)),
() ->
new SimpleAvro(
"bar",
9002,
1L,
new DateTime().withDate(1989, 3, 14).withTime(10, 30, 0, 0))),
true
},
new Object[] {
RowCoder.of(LOGICAL_NANOS_SCHEMA),
ImmutableList.<Supplier<Object>>of(
() ->
Row.withSchema(LOGICAL_NANOS_SCHEMA)
.withFieldValue("logicalNanos", Instant.ofEpochMilli(9001))
.build()),
true
},
new Object[] {
RowCoder.of(FLOATING_POINT_SCHEMA),
ImmutableList.<Supplier<Object>>of(
() ->
Row.withSchema(FLOATING_POINT_SCHEMA)
.withFieldValue("float", (float) 1.0)
.withFieldValue("double", 2.0)
.build()),
false
});
}
@Test
public void coderSerializable() {
CoderProperties.coderSerializable(coder);
}
@Test
public void coderConsistentWithEquals() throws Exception {
for (Supplier<Object> testValueA : testValues) {
for (Supplier<Object> testValueB : testValues) {
CoderProperties.coderConsistentWithEquals(coder, testValueA.get(), testValueB.get());
}
}
}
@Test
public void verifyDeterministic() throws Exception {
if (expectDeterministic) {
for (Supplier<Object> testValue : testValues) {
CoderProperties.coderDeterministic(coder, testValue.get(), testValue.get());
}
} else {
assertNonDeterministic(coder);
}
}
}
private static void assertNonDeterministic(SchemaCoder<?> coder) {
try {
coder.verifyDeterministic();
fail("Expected " + coder + " to be non-deterministic.");
} catch (NonDeterministicException e) {
assertThat(e.getReasons(), Matchers.iterableWithSize(1));
}
}
}