KAFKA-7476: Fix Date-based types in SchemaProjector
Various converters (AvroConverter and JsonConverter) produce a
SchemaAndValue consisting of a logical schema type and a java.util.Date.
This is a fix for SchemaProjector to properly handle the Date.
Author: Robert Yokota <rayokota@gmail.com>
Reviewers: Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
Closes #5736 from rayokota/KAFKA-7476
(cherry picked from commit 3edd8e7333ec0bb32ab5ae4ec4814fe30bb8f91d)
Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
index 6277e44..08ee37a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/SchemaProjector.java
@@ -159,7 +159,7 @@
assert source.type().isPrimitive();
assert target.type().isPrimitive();
Object result;
- if (isPromotable(source.type(), target.type())) {
+ if (isPromotable(source.type(), target.type()) && record instanceof Number) {
Number numberRecord = (Number) record;
switch (target.type()) {
case INT8:
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
index 101be04..ef6d029 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/SchemaProjectorTest.java
@@ -351,6 +351,17 @@
projected = SchemaProjector.project(Timestamp.SCHEMA, 34567L, Timestamp.SCHEMA);
assertEquals(34567L, projected);
+ java.util.Date date = new java.util.Date();
+
+ projected = SchemaProjector.project(Date.SCHEMA, date, Date.SCHEMA);
+ assertEquals(date, projected);
+
+ projected = SchemaProjector.project(Time.SCHEMA, date, Time.SCHEMA);
+ assertEquals(date, projected);
+
+ projected = SchemaProjector.project(Timestamp.SCHEMA, date, Timestamp.SCHEMA);
+ assertEquals(date, projected);
+
Schema namedSchema = SchemaBuilder.int32().name("invalidLogicalTypeName").build();
for (Schema logicalTypeSchema: logicalTypeSchemas) {
try {