blob: ef6d0296c6a95d080f47705f0dee21b5a221f1a7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
**/
package org.apache.kafka.connect.data;
import org.apache.kafka.connect.data.Schema.Type;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.SchemaProjectorException;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public class SchemaProjectorTest {
@Test
public void testPrimitiveTypeProjection() throws Exception {
Object projected;
projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
assertEquals(false, projected);
byte[] bytes = {(byte) 1, (byte) 2};
projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
assertEquals(bytes, projected);
projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
assertEquals("abc", projected);
projected = SchemaProjector.project(Schema.BOOLEAN_SCHEMA, false, Schema.OPTIONAL_BOOLEAN_SCHEMA);
assertEquals(false, projected);
projected = SchemaProjector.project(Schema.BYTES_SCHEMA, bytes, Schema.OPTIONAL_BYTES_SCHEMA);
assertEquals(bytes, projected);
projected = SchemaProjector.project(Schema.STRING_SCHEMA, "abc", Schema.OPTIONAL_STRING_SCHEMA);
assertEquals("abc", projected);
try {
SchemaProjector.project(Schema.OPTIONAL_BOOLEAN_SCHEMA, false, Schema.BOOLEAN_SCHEMA);
fail("Cannot project optional schema to schema with no default value.");
} catch (DataException e) {
// expected
}
try {
SchemaProjector.project(Schema.OPTIONAL_BYTES_SCHEMA, bytes, Schema.BYTES_SCHEMA);
fail("Cannot project optional schema to schema with no default value.");
} catch (DataException e) {
// expected
}
try {
SchemaProjector.project(Schema.OPTIONAL_STRING_SCHEMA, "abc", Schema.STRING_SCHEMA);
fail("Cannot project optional schema to schema with no default value.");
} catch (DataException e) {
// expected
}
}
@Test
public void testNumericTypeProjection() throws Exception {
Schema[] promotableSchemas = {Schema.INT8_SCHEMA, Schema.INT16_SCHEMA, Schema.INT32_SCHEMA, Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA, Schema.FLOAT64_SCHEMA};
Schema[] promotableOptionalSchemas = {Schema.OPTIONAL_INT8_SCHEMA, Schema.OPTIONAL_INT16_SCHEMA, Schema.OPTIONAL_INT32_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA,
Schema.OPTIONAL_FLOAT32_SCHEMA, Schema.OPTIONAL_FLOAT64_SCHEMA};
Object[] values = {(byte) 127, (short) 255, 32767, 327890L, 1.2F, 1.2345};
Map<Object, List<?>> expectedProjected = new HashMap<>();
expectedProjected.put(values[0], Arrays.asList((byte) 127, (short) 127, 127, 127L, 127.F, 127.));
expectedProjected.put(values[1], Arrays.asList((short) 255, 255, 255L, 255.F, 255.));
expectedProjected.put(values[2], Arrays.asList(32767, 32767L, 32767.F, 32767.));
expectedProjected.put(values[3], Arrays.asList(327890L, 327890.F, 327890.));
expectedProjected.put(values[4], Arrays.asList(1.2F, 1.2));
expectedProjected.put(values[5], Arrays.asList(1.2345));
Object promoted;
for (int i = 0; i < promotableSchemas.length; ++i) {
Schema source = promotableSchemas[i];
List<?> expected = expectedProjected.get(values[i]);
for (int j = i; j < promotableSchemas.length; ++j) {
Schema target = promotableSchemas[j];
promoted = SchemaProjector.project(source, values[i], target);
if (target.type() == Type.FLOAT64) {
assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
} else {
assertEquals(expected.get(j - i), promoted);
}
}
for (int j = i; j < promotableOptionalSchemas.length; ++j) {
Schema target = promotableOptionalSchemas[j];
promoted = SchemaProjector.project(source, values[i], target);
if (target.type() == Type.FLOAT64) {
assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
} else {
assertEquals(expected.get(j - i), promoted);
}
}
}
for (int i = 0; i < promotableOptionalSchemas.length; ++i) {
Schema source = promotableSchemas[i];
List<?> expected = expectedProjected.get(values[i]);
for (int j = i; j < promotableOptionalSchemas.length; ++j) {
Schema target = promotableOptionalSchemas[j];
promoted = SchemaProjector.project(source, values[i], target);
if (target.type() == Type.FLOAT64) {
assertEquals((Double) (expected.get(j - i)), (double) promoted, 1e-6);
} else {
assertEquals(expected.get(j - i), promoted);
}
}
}
Schema[] nonPromotableSchemas = {Schema.BOOLEAN_SCHEMA, Schema.BYTES_SCHEMA, Schema.STRING_SCHEMA};
for (Schema promotableSchema: promotableSchemas) {
for (Schema nonPromotableSchema: nonPromotableSchemas) {
Object dummy = new Object();
try {
SchemaProjector.project(promotableSchema, dummy, nonPromotableSchema);
fail("Cannot promote " + promotableSchema.type() + " to " + nonPromotableSchema.type());
} catch (DataException e) {
// expected
}
}
}
}
@Test
public void testPrimitiveOptionalProjection() throws Exception {
verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, true);
verifyOptionalProjection(Schema.OPTIONAL_BOOLEAN_SCHEMA, Type.BOOLEAN, false, true, false, false);
byte[] bytes = {(byte) 1, (byte) 2};
byte[] defaultBytes = {(byte) 3, (byte) 4};
verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, true);
verifyOptionalProjection(Schema.OPTIONAL_BYTES_SCHEMA, Type.BYTES, bytes, defaultBytes, bytes, false);
verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", true);
verifyOptionalProjection(Schema.OPTIONAL_STRING_SCHEMA, Type.STRING, "abc", "def", "abc", false);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, true);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT8, (byte) 12, (byte) 127, (byte) 12, false);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, true);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT16, (byte) 12, (short) 127, (short) 12, false);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, true);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT32, (byte) 12, 12789, 12, false);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, true);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.INT64, (byte) 12, 127890L, 12L, false);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, true);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT32, (byte) 12, 3.45F, 12.F, false);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., true);
verifyOptionalProjection(Schema.OPTIONAL_INT8_SCHEMA, Type.FLOAT64, (byte) 12, 3.4567, 12., false);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, true);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT16, (short) 12, (short) 127, (short) 12, false);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, true);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT32, (short) 12, 12789, 12, false);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, true);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.INT64, (short) 12, 127890L, 12L, false);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, true);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT32, (short) 12, 3.45F, 12.F, false);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., true);
verifyOptionalProjection(Schema.OPTIONAL_INT16_SCHEMA, Type.FLOAT64, (short) 12, 3.4567, 12., false);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, true);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT32, 12, 12789, 12, false);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, true);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.INT64, 12, 127890L, 12L, false);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, true);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT32, 12, 3.45F, 12.F, false);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., true);
verifyOptionalProjection(Schema.OPTIONAL_INT32_SCHEMA, Type.FLOAT64, 12, 3.4567, 12., false);
verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, true);
verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.INT64, 12L, 127890L, 12L, false);
verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, true);
verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT32, 12L, 3.45F, 12.F, false);
verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., true);
verifyOptionalProjection(Schema.OPTIONAL_INT64_SCHEMA, Type.FLOAT64, 12L, 3.4567, 12., false);
verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, true);
verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT32, 12.345F, 3.45F, 12.345F, false);
verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, true);
verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345F, 3.4567, 12.345, false);
verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, true);
verifyOptionalProjection(Schema.OPTIONAL_FLOAT32_SCHEMA, Type.FLOAT64, 12.345, 3.4567, 12.345, false);
}
@Test
public void testStructAddField() throws Exception {
Schema source = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.build();
Struct sourceStruct = new Struct(source);
sourceStruct.put("field", 1);
Schema target = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.field("field2", SchemaBuilder.int32().defaultValue(123).build())
.build();
Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
assertEquals(1, (int) targetStruct.getInt32("field"));
assertEquals(123, (int) targetStruct.getInt32("field2"));
Schema incompatibleTargetSchema = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.field("field2", Schema.INT32_SCHEMA)
.build();
try {
SchemaProjector.project(source, sourceStruct, incompatibleTargetSchema);
fail("Incompatible schema.");
} catch (DataException e) {
// expected
}
}
@Test
public void testStructRemoveField() throws Exception {
Schema source = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.field("field2", Schema.INT32_SCHEMA)
.build();
Struct sourceStruct = new Struct(source);
sourceStruct.put("field", 1);
sourceStruct.put("field2", 234);
Schema target = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.build();
Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
assertEquals(1, targetStruct.get("field"));
try {
targetStruct.get("field2");
fail("field2 is not part of the projected struct");
} catch (DataException e) {
// expected
}
}
@Test
public void testStructDefaultValue() throws Exception {
Schema source = SchemaBuilder.struct().optional()
.field("field", Schema.INT32_SCHEMA)
.field("field2", Schema.INT32_SCHEMA)
.build();
SchemaBuilder builder = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.field("field2", Schema.INT32_SCHEMA);
Struct defaultStruct = new Struct(builder).put("field", 12).put("field2", 345);
builder.defaultValue(defaultStruct);
Schema target = builder.build();
Object projected = SchemaProjector.project(source, null, target);
assertEquals(defaultStruct, projected);
Struct sourceStruct = new Struct(source).put("field", 45).put("field2", 678);
Struct targetStruct = (Struct) SchemaProjector.project(source, sourceStruct, target);
assertEquals(sourceStruct.get("field"), targetStruct.get("field"));
assertEquals(sourceStruct.get("field2"), targetStruct.get("field2"));
}
@Test
public void testNestedSchemaProjection() throws Exception {
Schema sourceFlatSchema = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.build();
Schema targetFlatSchema = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.field("field2", SchemaBuilder.int32().defaultValue(123).build())
.build();
Schema sourceNestedSchema = SchemaBuilder.struct()
.field("first", Schema.INT32_SCHEMA)
.field("second", Schema.STRING_SCHEMA)
.field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
.field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
.field("nested", sourceFlatSchema)
.build();
Schema targetNestedSchema = SchemaBuilder.struct()
.field("first", Schema.INT32_SCHEMA)
.field("second", Schema.STRING_SCHEMA)
.field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
.field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
.field("nested", targetFlatSchema)
.build();
Struct sourceFlatStruct = new Struct(sourceFlatSchema);
sourceFlatStruct.put("field", 113);
Struct sourceNestedStruct = new Struct(sourceNestedSchema);
sourceNestedStruct.put("first", 1);
sourceNestedStruct.put("second", "abc");
sourceNestedStruct.put("array", Arrays.asList(1, 2));
sourceNestedStruct.put("map", Collections.singletonMap(5, "def"));
sourceNestedStruct.put("nested", sourceFlatStruct);
Struct targetNestedStruct = (Struct) SchemaProjector.project(sourceNestedSchema, sourceNestedStruct,
targetNestedSchema);
assertEquals(1, targetNestedStruct.get("first"));
assertEquals("abc", targetNestedStruct.get("second"));
assertEquals(Arrays.asList(1, 2), (List<Integer>) targetNestedStruct.get("array"));
assertEquals(Collections.singletonMap(5, "def"), (Map<Integer, String>) targetNestedStruct.get("map"));
Struct projectedStruct = (Struct) targetNestedStruct.get("nested");
assertEquals(113, projectedStruct.get("field"));
assertEquals(123, projectedStruct.get("field2"));
}
@Test
public void testLogicalTypeProjection() throws Exception {
Schema[] logicalTypeSchemas = {Decimal.schema(2), Date.SCHEMA, Time.SCHEMA, Timestamp.SCHEMA};
Object projected;
BigDecimal testDecimal = new BigDecimal(new BigInteger("156"), 2);
projected = SchemaProjector.project(Decimal.schema(2), testDecimal, Decimal.schema(2));
assertEquals(testDecimal, projected);
projected = SchemaProjector.project(Date.SCHEMA, 1000, Date.SCHEMA);
assertEquals(1000, projected);
projected = SchemaProjector.project(Time.SCHEMA, 231, Time.SCHEMA);
assertEquals(231, projected);
projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA);
assertEquals(34567L, projected);
java.util.Date date = new java.util.Date();
projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
assertEquals(date, projected);
projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
assertEquals(date, projected);
projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA);
assertEquals(date, projected);
Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build();
for (Schema logicalTypeSchema: logicalTypeSchemas) {
try {
SchemaProjector.project(logicalTypeSchema, null, Schema.BOOLEAN_SCHEMA);
fail("Cannot project logical types to non-logical types.");
} catch (SchemaProjectorException e) {
// expected
}
try {
SchemaProjector.project(logicalTypeSchema, null, namedSchema);
fail("Reader name is not a valid logical type name.");
} catch (SchemaProjectorException e) {
// expected
}
try {
SchemaProjector.project(Schema.BOOLEAN_SCHEMA, null, logicalTypeSchema);
fail("Cannot project non-logical types to logical types.");
} catch (SchemaProjectorException e) {
// expected
}
}
}
@Test
public void testArrayProjection() throws Exception {
Schema source = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
Object projected = SchemaProjector.project(source, Arrays.asList(1, 2, 3), source);
assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
Schema optionalSource = SchemaBuilder.array(Schema.INT32_SCHEMA).optional().build();
Schema target = SchemaBuilder.array(Schema.INT32_SCHEMA).defaultValue(Arrays.asList(1, 2, 3)).build();
projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), target);
assertEquals(Arrays.asList(4, 5), (List<Integer>) projected);
projected = SchemaProjector.project(optionalSource, null, target);
assertEquals(Arrays.asList(1, 2, 3), (List<Integer>) projected);
Schema promotedTarget = SchemaBuilder.array(Schema.INT64_SCHEMA).defaultValue(Arrays.asList(1L, 2L, 3L)).build();
projected = SchemaProjector.project(optionalSource, Arrays.asList(4, 5), promotedTarget);
List<Long> expectedProjected = Arrays.asList(4L, 5L);
assertEquals(expectedProjected, (List<Long>) projected);
projected = SchemaProjector.project(optionalSource, null, promotedTarget);
assertEquals(Arrays.asList(1L, 2L, 3L), (List<Long>) projected);
Schema noDefaultValueTarget = SchemaBuilder.array(Schema.INT32_SCHEMA).build();
try {
SchemaProjector.project(optionalSource, null, noDefaultValueTarget);
fail("Target schema does not provide a default value.");
} catch (SchemaProjectorException e) {
// expected
}
Schema nonPromotableTarget = SchemaBuilder.array(Schema.BOOLEAN_SCHEMA).build();
try {
SchemaProjector.project(optionalSource, null, nonPromotableTarget);
fail("Neither source type matches target type nor source type can be promoted to target type");
} catch (SchemaProjectorException e) {
// expected
}
}
@Test
public void testMapProjection() throws Exception {
Schema source = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).optional().build();
Schema target = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).defaultValue(Collections.singletonMap(1, 2)).build();
Object projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), target);
assertEquals(Collections.singletonMap(3, 4), (Map<Integer, Integer>) projected);
projected = SchemaProjector.project(source, null, target);
assertEquals(Collections.singletonMap(1, 2), (Map<Integer, Integer>) projected);
Schema promotedTarget = SchemaBuilder.map(Schema.INT64_SCHEMA, Schema.FLOAT32_SCHEMA).defaultValue(
Collections.singletonMap(3L, 4.5F)).build();
projected = SchemaProjector.project(source, Collections.singletonMap(3, 4), promotedTarget);
assertEquals(Collections.singletonMap(3L, 4.F), (Map<Long, Float>) projected);
projected = SchemaProjector.project(source, null, promotedTarget);
assertEquals(Collections.singletonMap(3L, 4.5F), (Map<Long, Float>) projected);
Schema noDefaultValueTarget = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build();
try {
SchemaProjector.project(source, null, noDefaultValueTarget);
fail("Reader does not provide a default value.");
} catch (SchemaProjectorException e) {
// expected
}
Schema nonPromotableTarget = SchemaBuilder.map(Schema.BOOLEAN_SCHEMA, Schema.STRING_SCHEMA).build();
try {
SchemaProjector.project(source, null, nonPromotableTarget);
fail("Neither source type matches target type nor source type can be promoted to target type");
} catch (SchemaProjectorException e) {
// expected
}
}
@Test
public void testMaybeCompatible() throws Exception {
Schema source = SchemaBuilder.int32().name("source").build();
Schema target = SchemaBuilder.int32().name("target").build();
try {
SchemaProjector.project(source, 12, target);
fail("Source name and target name mismatch.");
} catch (SchemaProjectorException e) {
// expected
}
Schema targetWithParameters = SchemaBuilder.int32().parameters(Collections.singletonMap("key", "value"));
try {
SchemaProjector.project(source, 34, targetWithParameters);
fail("Source parameters and target parameters mismatch.");
} catch (SchemaProjectorException e) {
// expected
}
}
@Test
public void testProjectMissingDefaultValuedStructField() {
final Schema source = SchemaBuilder.struct().build();
final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.int64().defaultValue(42L).build()).build();
assertEquals(42L, (long) ((Struct) SchemaProjector.project(source, new Struct(source), target)).getInt64("id"));
}
@Test
public void testProjectMissingOptionalStructField() {
final Schema source = SchemaBuilder.struct().build();
final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.OPTIONAL_INT64_SCHEMA).build();
assertEquals(null, ((Struct) SchemaProjector.project(source, new Struct(source), target)).getInt64("id"));
}
@Test(expected = SchemaProjectorException.class)
public void testProjectMissingRequiredField() {
final Schema source = SchemaBuilder.struct().build();
final Schema target = SchemaBuilder.struct().field("id", SchemaBuilder.INT64_SCHEMA).build();
SchemaProjector.project(source, new Struct(source), target);
}
private void verifyOptionalProjection(Schema source, Type targetType, Object value, Object defaultValue, Object expectedProjected, boolean optional) {
Schema target;
assert source.isOptional();
assert value != null;
if (optional) {
target = SchemaBuilder.type(targetType).optional().defaultValue(defaultValue).build();
} else {
target = SchemaBuilder.type(targetType).defaultValue(defaultValue).build();
}
Object projected = SchemaProjector.project(source, value, target);
if (targetType == Type.FLOAT64) {
assertEquals((double) expectedProjected, (double) projected, 1e-6);
} else {
assertEquals(expectedProjected, projected);
}
projected = SchemaProjector.project(source, null, target);
if (optional) {
assertEquals(null, projected);
} else {
assertEquals(defaultValue, projected);
}
}
}