blob: 3a1920ed528ce05bc81e5b7d50814e07ebc88edb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.connect.transforms;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Date;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.data.Time;
import org.apache.kafka.connect.data.Timestamp;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.After;
import org.junit.Test;
import java.util.Calendar;
import java.util.Collections;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Map;
import java.util.TimeZone;
import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
public class TimestampConverterTest {
private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
private static final Calendar EPOCH;
private static final Calendar TIME;
private static final Calendar DATE;
private static final Calendar DATE_PLUS_TIME;
private static final long DATE_PLUS_TIME_UNIX;
private static final String STRING_DATE_FMT = "yyyy MM dd HH mm ss SSS z";
private static final String DATE_PLUS_TIME_STRING;
private final TimestampConverter<SourceRecord> xformKey = new TimestampConverter.Key<>();
private final TimestampConverter<SourceRecord> xformValue = new TimestampConverter.Value<>();
static {
EPOCH = GregorianCalendar.getInstance(UTC);
EPOCH.setTimeInMillis(0L);
TIME = GregorianCalendar.getInstance(UTC);
TIME.setTimeInMillis(0L);
TIME.add(Calendar.MILLISECOND, 1234);
DATE = GregorianCalendar.getInstance(UTC);
DATE.setTimeInMillis(0L);
DATE.set(1970, Calendar.JANUARY, 1, 0, 0, 0);
DATE.add(Calendar.DATE, 1);
DATE_PLUS_TIME = GregorianCalendar.getInstance(UTC);
DATE_PLUS_TIME.setTimeInMillis(0L);
DATE_PLUS_TIME.add(Calendar.DATE, 1);
DATE_PLUS_TIME.add(Calendar.MILLISECOND, 1234);
DATE_PLUS_TIME_UNIX = DATE_PLUS_TIME.getTime().getTime();
DATE_PLUS_TIME_STRING = "1970 01 02 00 00 01 234 UTC";
}
// Configuration
@After
public void teardown() {
xformKey.close();
xformValue.close();
}
@Test(expected = ConfigException.class)
public void testConfigNoTargetType() {
xformValue.configure(Collections.<String, String>emptyMap());
}
@Test(expected = ConfigException.class)
public void testConfigInvalidTargetType() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "invalid"));
}
@Test(expected = ConfigException.class)
public void testConfigMissingFormat() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "string"));
}
@Test(expected = ConfigException.class)
public void testConfigInvalidFormat() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, "bad-format");
xformValue.configure(config);
}
// Conversions without schemas (most flexible Timestamp -> other types)
@Test
public void testSchemalessIdentity() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
}
@Test
public void testSchemalessTimestampToDate() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE.getTime(), transformed.value());
}
@Test
public void testSchemalessTimestampToTime() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(TIME.getTime(), transformed.value());
}
@Test
public void testSchemalessTimestampToUnix() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
}
@Test
public void testSchemalessTimestampToString() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME.getTime()));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
}
// Conversions without schemas (core types -> most flexible Timestamp format)
@Test
public void testSchemalessDateToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE.getTime()));
assertNull(transformed.valueSchema());
// No change expected since the source type is coarser-grained
assertEquals(DATE.getTime(), transformed.value());
}
@Test
public void testSchemalessTimeToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(TIME.getTime()));
assertNull(transformed.valueSchema());
// No change expected since the source type is coarser-grained
assertEquals(TIME.getTime(), transformed.value());
}
@Test
public void testSchemalessUnixToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_UNIX));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
}
@Test
public void testSchemalessStringToTimestamp() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(DATE_PLUS_TIME_STRING));
assertNull(transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
}
// Conversions with schemas (most flexible Timestamp -> other types)
@Test
public void testWithSchemaIdentity() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
}
@Test
public void testWithSchemaTimestampToDate() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Date"));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Date.SCHEMA, transformed.valueSchema());
assertEquals(DATE.getTime(), transformed.value());
}
@Test
public void testWithSchemaTimestampToTime() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Time"));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Time.SCHEMA, transformed.valueSchema());
assertEquals(TIME.getTime(), transformed.value());
}
@Test
public void testWithSchemaTimestampToUnix() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "unix"));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Schema.INT64_SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_UNIX, transformed.value());
}
@Test
public void testWithSchemaTimestampToString() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "string");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Timestamp.SCHEMA, DATE_PLUS_TIME.getTime()));
assertEquals(Schema.STRING_SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME_STRING, transformed.value());
}
// Null-value conversions schemaless
@Test
public void testSchemalessNullValueToString() {
testSchemalessNullValueConversion("string");
testSchemalessNullFieldConversion("string");
}
@Test
public void testSchemalessNullValueToDate() {
testSchemalessNullValueConversion("Date");
testSchemalessNullFieldConversion("Date");
}
@Test
public void testSchemalessNullValueToTimestamp() {
testSchemalessNullValueConversion("Timestamp");
testSchemalessNullFieldConversion("Timestamp");
}
@Test
public void testSchemalessNullValueToUnix() {
testSchemalessNullValueConversion("unix");
testSchemalessNullFieldConversion("unix");
}
@Test
public void testSchemalessNullValueToTime() {
testSchemalessNullValueConversion("Time");
testSchemalessNullFieldConversion("Time");
}
private void testSchemalessNullValueConversion(String targetType) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(null));
assertNull(transformed.valueSchema());
assertNull(transformed.value());
}
private void testSchemalessNullFieldConversion(String targetType) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordSchemaless(null));
assertNull(transformed.valueSchema());
assertNull(transformed.value());
}
// Conversions with schemas (core types -> most flexible Timestamp format)
@Test
public void testWithSchemaDateToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Date.SCHEMA, DATE.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
// No change expected since the source type is coarser-grained
assertEquals(DATE.getTime(), transformed.value());
}
@Test
public void testWithSchemaTimeToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Time.SCHEMA, TIME.getTime()));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
// No change expected since the source type is coarser-grained
assertEquals(TIME.getTime(), transformed.value());
}
@Test
public void testWithSchemaUnixToTimestamp() {
xformValue.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Schema.INT64_SCHEMA, DATE_PLUS_TIME_UNIX));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
}
@Test
public void testWithSchemaStringToTimestamp() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordWithSchema(Schema.STRING_SCHEMA, DATE_PLUS_TIME_STRING));
assertEquals(Timestamp.SCHEMA, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.value());
}
// Null-value conversions with schema
@Test
public void testWithSchemaNullValueToTimestamp() {
testWithSchemaNullValueConversion("Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullValueConversion("Timestamp", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToTimestamp() {
testWithSchemaNullFieldConversion("Timestamp", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
testWithSchemaNullFieldConversion("Timestamp", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA);
}
@Test
public void testWithSchemaNullValueToUnix() {
testWithSchemaNullValueConversion("unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullValueConversion("unix", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToUnix() {
testWithSchemaNullFieldConversion("unix", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
testWithSchemaNullFieldConversion("unix", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_INT64_SCHEMA);
}
@Test
public void testWithSchemaNullValueToTime() {
testWithSchemaNullValueConversion("Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullValueConversion("Time", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToTime() {
testWithSchemaNullFieldConversion("Time", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
testWithSchemaNullFieldConversion("Time", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_TIME_SCHEMA);
}
@Test
public void testWithSchemaNullValueToDate() {
testWithSchemaNullValueConversion("Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullValueConversion("Date", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToDate() {
testWithSchemaNullFieldConversion("Date", Schema.OPTIONAL_INT64_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_TIME_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_DATE_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", Schema.OPTIONAL_STRING_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
testWithSchemaNullFieldConversion("Date", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, TimestampConverter.OPTIONAL_DATE_SCHEMA);
}
@Test
public void testWithSchemaNullValueToString() {
testWithSchemaNullValueConversion("string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullValueConversion("string", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
}
@Test
public void testWithSchemaNullFieldToString() {
testWithSchemaNullFieldConversion("string", Schema.OPTIONAL_INT64_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_TIME_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_DATE_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
testWithSchemaNullFieldConversion("string", TimestampConverter.OPTIONAL_TIMESTAMP_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
}
private void testWithSchemaNullValueConversion(String targetType, Schema originalSchema, Schema expectedSchema) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
xformValue.configure(config);
SourceRecord transformed = xformValue.apply(createRecordWithSchema(originalSchema, null));
assertEquals(expectedSchema, transformed.valueSchema());
assertNull(transformed.value());
}
private void testWithSchemaNullFieldConversion(String targetType, Schema originalSchema, Schema expectedSchema) {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, targetType);
config.put(TimestampConverter.FORMAT_CONFIG, STRING_DATE_FMT);
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xformValue.configure(config);
SchemaBuilder structSchema = SchemaBuilder.struct()
.field("ts", originalSchema)
.field("other", Schema.STRING_SCHEMA);
SchemaBuilder expectedStructSchema = SchemaBuilder.struct()
.field("ts", expectedSchema)
.field("other", Schema.STRING_SCHEMA);
Struct original = new Struct(structSchema);
original.put("ts", null);
original.put("other", "test");
// Struct field is null
SourceRecord transformed = xformValue.apply(createRecordWithSchema(structSchema.build(), original));
assertEquals(expectedStructSchema.build(), transformed.valueSchema());
assertNull(requireStruct(transformed.value(), "").get("ts"));
// entire Struct is null
transformed = xformValue.apply(createRecordWithSchema(structSchema.optional().build(), null));
assertEquals(expectedStructSchema.optional().build(), transformed.valueSchema());
assertNull(transformed.value());
}
// Convert field instead of entire key/value
@Test
public void testSchemalessFieldConversion() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Date");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xformValue.configure(config);
Object value = Collections.singletonMap("ts", DATE_PLUS_TIME.getTime());
SourceRecord transformed = xformValue.apply(createRecordSchemaless(value));
assertNull(transformed.valueSchema());
assertEquals(Collections.singletonMap("ts", DATE.getTime()), transformed.value());
}
@Test
public void testWithSchemaFieldConversion() {
Map<String, String> config = new HashMap<>();
config.put(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp");
config.put(TimestampConverter.FIELD_CONFIG, "ts");
xformValue.configure(config);
// ts field is a unix timestamp
Schema structWithTimestampFieldSchema = SchemaBuilder.struct()
.field("ts", Schema.INT64_SCHEMA)
.field("other", Schema.STRING_SCHEMA)
.build();
Struct original = new Struct(structWithTimestampFieldSchema);
original.put("ts", DATE_PLUS_TIME_UNIX);
original.put("other", "test");
SourceRecord transformed = xformValue.apply(createRecordWithSchema(structWithTimestampFieldSchema, original));
Schema expectedSchema = SchemaBuilder.struct()
.field("ts", Timestamp.SCHEMA)
.field("other", Schema.STRING_SCHEMA)
.build();
assertEquals(expectedSchema, transformed.valueSchema());
assertEquals(DATE_PLUS_TIME.getTime(), ((Struct) transformed.value()).get("ts"));
assertEquals("test", ((Struct) transformed.value()).get("other"));
}
// Validate Key implementation in addition to Value
@Test
public void testKey() {
xformKey.configure(Collections.singletonMap(TimestampConverter.TARGET_TYPE_CONFIG, "Timestamp"));
SourceRecord transformed = xformKey.apply(new SourceRecord(null, null, "topic", 0, null, DATE_PLUS_TIME.getTime(), null, null));
assertNull(transformed.keySchema());
assertEquals(DATE_PLUS_TIME.getTime(), transformed.key());
}
private SourceRecord createRecordWithSchema(Schema schema, Object value) {
return new SourceRecord(null, null, "topic", 0, schema, value);
}
private SourceRecord createRecordSchemaless(Object value) {
return createRecordWithSchema(null, value);
}
}