[fix](lookup)enhance compatibility for lookup join with the java.sql.Timestamp (#341)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index afda237..130d236 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -47,6 +47,7 @@
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
+import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
@@ -195,9 +196,11 @@
return val -> {
if (val instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime) val);
+ } else if (val instanceof Timestamp) {
+ return TimestampData.fromTimestamp((Timestamp) val);
} else {
throw new UnsupportedOperationException(
- "timestamp type must be java.time.LocalDateTime, the actual type is: "
+ "timestamp type must be java.time.LocalDateTime or java.sql.Timestamp, the actual type is: "
+ val.getClass().getName());
}
};
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index b63d033..c9016c5 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -37,6 +37,7 @@
import java.io.IOException;
import java.io.Serializable;
import java.math.BigDecimal;
+import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.ArrayList;
@@ -65,7 +66,9 @@
Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
Column.physical("f14", DataTypes.DATE()),
Column.physical("f15", DataTypes.CHAR(1)),
- Column.physical("f16", DataTypes.VARCHAR(256)));
+ Column.physical("f16", DataTypes.VARCHAR(256)),
+ Column.physical("f17", DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+ Column.physical("f18", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()));
DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
@@ -73,6 +76,8 @@
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDate date1 = LocalDate.of(2021, 1, 1);
+ Timestamp timestamp1 = Timestamp.valueOf(time1);
+ Timestamp timestamp2 = Timestamp.valueOf(time2);
List<Object> record =
Arrays.asList(
null,
@@ -90,7 +95,9 @@
time2,
date1,
"a",
- "doris");
+ "doris",
+ timestamp1,
+ timestamp2);
GenericRowData rowData = converter.convertInternal(record);
RowDataSerializer serializer =
@@ -101,12 +108,12 @@
.setFieldNames(
new String[] {
"f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10",
- "f11", "f12", "f13", "f14", "f15", "f16"
+ "f11", "f12", "f13", "f14", "f15", "f16", "f17", "f18"
})
.build();
String s = new String(serializer.serialize(rowData).getRow());
Assert.assertEquals(
- "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris",
+ "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001",
s);
}
@@ -129,12 +136,16 @@
Column.physical("f13", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
Column.physical("f14", DataTypes.DATE()),
Column.physical("f15", DataTypes.CHAR(1)),
- Column.physical("f16", DataTypes.VARCHAR(256)));
+ Column.physical("f16", DataTypes.VARCHAR(256)),
+ Column.physical("f17", DataTypes.TIMESTAMP_WITH_TIME_ZONE()),
+ Column.physical("f18", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()));
DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
// Doris DatetimeV2 supports up to 6 decimal places (microseconds).
LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ Timestamp timestamp1 = Timestamp.valueOf(time1);
+ Timestamp timestamp2 = Timestamp.valueOf(time2);
LocalDate date1 = LocalDate.of(2021, 1, 1);
GenericRowData rowData =
GenericRowData.of(
@@ -153,13 +164,16 @@
TimestampData.fromLocalDateTime(time2),
(int) date1.toEpochDay(),
StringData.fromString("a"),
- StringData.fromString("doris"));
+ StringData.fromString("doris"),
+ TimestampData.fromTimestamp(timestamp1),
+ TimestampData.fromTimestamp(timestamp2));
List<Object> row = new ArrayList<>();
for (int i = 0; i < rowData.getArity(); i++) {
row.add(converter.convertExternal(rowData, i));
}
+ // System.out.println(row.toString());
Assert.assertEquals(
- "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris]",
+ "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001]",
row.toString());
}