blob: e7dfa4c8e365329971498f187e7b9b8547b46d68 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.data;
import org.apache.kafka.connect.errors.DataException;
import org.junit.Test;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
public class ConnectSchemaTest {
private static final Schema MAP_INT_STRING_SCHEMA = SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build();
private static final Schema FLAT_STRUCT_SCHEMA = SchemaBuilder.struct()
.field("field", Schema.INT32_SCHEMA)
.build();
private static final Schema STRUCT_SCHEMA = SchemaBuilder.struct()
.field("first", Schema.INT32_SCHEMA)
.field("second", Schema.STRING_SCHEMA)
.field("array", SchemaBuilder.array(Schema.INT32_SCHEMA).build())
.field("map", SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build())
.field("nested", FLAT_STRUCT_SCHEMA)
.build();
private static final Schema PARENT_STRUCT_SCHEMA = SchemaBuilder.struct()
.field("nested", FLAT_STRUCT_SCHEMA)
.build();
@Test
public void testFieldsOnStructSchema() {
Schema schema = SchemaBuilder.struct()
.field("foo", Schema.BOOLEAN_SCHEMA)
.field("bar", Schema.INT32_SCHEMA)
.build();
assertEquals(2, schema.fields().size());
// Validate field lookup by name
Field foo = schema.field("foo");
assertEquals(0, foo.index());
Field bar = schema.field("bar");
assertEquals(1, bar.index());
// Any other field name should fail
assertNull(schema.field("other"));
}
@Test(expected = DataException.class)
public void testFieldsOnlyValidForStructs() {
Schema.INT8_SCHEMA.fields();
}
@Test
public void testValidateValueMatchingType() {
ConnectSchema.validateValue(Schema.INT8_SCHEMA, (byte) 1);
ConnectSchema.validateValue(Schema.INT16_SCHEMA, (short) 1);
ConnectSchema.validateValue(Schema.INT32_SCHEMA, 1);
ConnectSchema.validateValue(Schema.INT64_SCHEMA, (long) 1);
ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.f);
ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.);
ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, true);
ConnectSchema.validateValue(Schema.STRING_SCHEMA, "a string");
ConnectSchema.validateValue(Schema.BYTES_SCHEMA, "a byte array".getBytes());
ConnectSchema.validateValue(Schema.BYTES_SCHEMA, ByteBuffer.wrap("a byte array".getBytes()));
ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, 3));
ConnectSchema.validateValue(
SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.STRING_SCHEMA).build(),
Collections.singletonMap(1, "value")
);
// Struct tests the basic struct layout + complex field types + nested structs
Struct structValue = new Struct(STRUCT_SCHEMA)
.put("first", 1)
.put("second", "foo")
.put("array", Arrays.asList(1, 2, 3))
.put("map", Collections.singletonMap(1, "value"))
.put("nested", new Struct(FLAT_STRUCT_SCHEMA).put("field", 12));
ConnectSchema.validateValue(STRUCT_SCHEMA, structValue);
}
@Test
public void testValidateValueMatchingLogicalType() {
ConnectSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2));
ConnectSchema.validateValue(Date.SCHEMA, new java.util.Date(0));
ConnectSchema.validateValue(Time.SCHEMA, new java.util.Date(0));
ConnectSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0));
}
// To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible
// to only include a single test for each type
@Test(expected = DataException.class)
public void testValidateValueMismatchInt8() {
ConnectSchema.validateValue(Schema.INT8_SCHEMA, 1);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchInt16() {
ConnectSchema.validateValue(Schema.INT16_SCHEMA, 1);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchInt32() {
ConnectSchema.validateValue(Schema.INT32_SCHEMA, (long) 1);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchInt64() {
ConnectSchema.validateValue(Schema.INT64_SCHEMA, 1);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchFloat() {
ConnectSchema.validateValue(Schema.FLOAT32_SCHEMA, 1.0);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchDouble() {
ConnectSchema.validateValue(Schema.FLOAT64_SCHEMA, 1.f);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchBoolean() {
ConnectSchema.validateValue(Schema.BOOLEAN_SCHEMA, 1.f);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchString() {
// CharSequence is a similar type (supertype of String), but we restrict to String.
CharBuffer cbuf = CharBuffer.wrap("abc");
ConnectSchema.validateValue(Schema.STRING_SCHEMA, cbuf);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchBytes() {
ConnectSchema.validateValue(Schema.BYTES_SCHEMA, new Object[]{1, "foo"});
}
@Test(expected = DataException.class)
public void testValidateValueMismatchArray() {
ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList("a", "b", "c"));
}
@Test(expected = DataException.class)
public void testValidateValueMismatchArraySomeMatch() {
// Even if some match the right type, this should fail if any mismatch. In this case, type erasure loses
// the fact that the list is actually List<Object>, but we couldn't tell if only checking the first element
ConnectSchema.validateValue(SchemaBuilder.array(Schema.INT32_SCHEMA).build(), Arrays.asList(1, 2, "c"));
}
@Test(expected = DataException.class)
public void testValidateValueMismatchMapKey() {
ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap("wrong key type", "value"));
}
@Test(expected = DataException.class)
public void testValidateValueMismatchMapValue() {
ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, Collections.singletonMap(1, 2));
}
@Test(expected = DataException.class)
public void testValidateValueMismatchMapSomeKeys() {
Map<Object, String> data = new HashMap<>();
data.put(1, "abc");
data.put("wrong", "it's as easy as one two three");
ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchMapSomeValues() {
Map<Integer, Object> data = new HashMap<>();
data.put(1, "abc");
data.put(2, "wrong".getBytes());
ConnectSchema.validateValue(MAP_INT_STRING_SCHEMA, data);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchStructWrongSchema() {
// Completely mismatching schemas
ConnectSchema.validateValue(
FLAT_STRUCT_SCHEMA,
new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1)
);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchStructWrongNestedSchema() {
// Top-level schema matches, but nested does not.
ConnectSchema.validateValue(
PARENT_STRUCT_SCHEMA,
new Struct(PARENT_STRUCT_SCHEMA)
.put("nested", new Struct(SchemaBuilder.struct().field("x", Schema.INT32_SCHEMA).build()).put("x", 1))
);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchDecimal() {
ConnectSchema.validateValue(Decimal.schema(2), new BigInteger("156"));
}
@Test(expected = DataException.class)
public void testValidateValueMismatchDate() {
ConnectSchema.validateValue(Date.SCHEMA, 1000L);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchTime() {
ConnectSchema.validateValue(Time.SCHEMA, 1000L);
}
@Test(expected = DataException.class)
public void testValidateValueMismatchTimestamp() {
ConnectSchema.validateValue(Timestamp.SCHEMA, 1000L);
}
@Test
public void testPrimitiveEquality() {
// Test that primitive types, which only need to consider all the type & metadata fields, handle equality correctly
ConnectSchema s1 = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
ConnectSchema s2 = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc");
ConnectSchema differentType = new ConnectSchema(Schema.Type.INT16, false, null, "name", 2, "doc");
ConnectSchema differentOptional = new ConnectSchema(Schema.Type.INT8, true, null, "name", 2, "doc");
ConnectSchema differentDefault = new ConnectSchema(Schema.Type.INT8, false, true, "name", 2, "doc");
ConnectSchema differentName = new ConnectSchema(Schema.Type.INT8, false, null, "otherName", 2, "doc");
ConnectSchema differentVersion = new ConnectSchema(Schema.Type.INT8, false, null, "name", 4, "doc");
ConnectSchema differentDoc = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "other doc");
ConnectSchema differentParameters = new ConnectSchema(Schema.Type.INT8, false, null, "name", 2, "doc", Collections.singletonMap("param", "value"), null, null, null);
assertEquals(s1, s2);
assertNotEquals(s1, differentType);
assertNotEquals(s1, differentOptional);
assertNotEquals(s1, differentDefault);
assertNotEquals(s1, differentName);
assertNotEquals(s1, differentVersion);
assertNotEquals(s1, differentDoc);
assertNotEquals(s1, differentParameters);
}
@Test
public void testArrayEquality() {
// Validate that the value type for the array is tested for equality. This test makes sure the same schema object is
// never reused to ensure we're actually checking equality
ConnectSchema s1 = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
ConnectSchema s2 = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int8().build());
ConnectSchema differentValueSchema = new ConnectSchema(Schema.Type.ARRAY, false, null, null, null, null, null, null, null, SchemaBuilder.int16().build());
assertEquals(s1, s2);
assertNotEquals(s1, differentValueSchema);
}
@Test
public void testArrayDefaultValueEquality() {
ConnectSchema s1 = new ConnectSchema(Schema.Type.ARRAY, false, new String[] {"a", "b"}, null, null, null, null, null, null, SchemaBuilder.int8().build());
ConnectSchema s2 = new ConnectSchema(Schema.Type.ARRAY, false, new String[] {"a", "b"}, null, null, null, null, null, null, SchemaBuilder.int8().build());
ConnectSchema differentValueSchema = new ConnectSchema(Schema.Type.ARRAY, false, new String[] {"b", "c"}, null, null, null, null, null, null, SchemaBuilder.int8().build());
assertEquals(s1, s2);
assertNotEquals(s1, differentValueSchema);
}
@Test
public void testMapEquality() {
// Same as testArrayEquality, but for both key and value schemas
ConnectSchema s1 = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
ConnectSchema s2 = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.int16().build());
ConnectSchema differentKeySchema = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.string().build(), SchemaBuilder.int16().build());
ConnectSchema differentValueSchema = new ConnectSchema(Schema.Type.MAP, false, null, null, null, null, null, null, SchemaBuilder.int8().build(), SchemaBuilder.string().build());
assertEquals(s1, s2);
assertNotEquals(s1, differentKeySchema);
assertNotEquals(s1, differentValueSchema);
}
@Test
public void testStructEquality() {
// Same as testArrayEquality, but checks differences in fields. Only does a simple check, relying on tests of
// Field's equals() method to validate all variations in the list of fields will be checked
ConnectSchema s1 = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
ConnectSchema s2 = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
new Field("field2", 1, SchemaBuilder.int16().build())), null, null);
ConnectSchema differentField = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null, null,
Arrays.asList(new Field("field", 0, SchemaBuilder.int8().build()),
new Field("different field name", 1, SchemaBuilder.int16().build())), null, null);
assertEquals(s1, s2);
assertNotEquals(s1, differentField);
}
@Test
public void testEmptyStruct() {
final ConnectSchema emptyStruct = new ConnectSchema(Schema.Type.STRUCT, false, null, null, null, null);
assertEquals(0, emptyStruct.fields().size());
new Struct(emptyStruct);
}
}