[FLINK-28036][hive] HiveInspectors should use correct writable type to create ConstantObjectInspector
This closes #19949
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 0dfd290..afa3b96 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -37,9 +37,12 @@
import org.apache.hadoop.hive.common.type.HiveChar;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -88,12 +91,10 @@
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.ShortWritable;
import org.apache.hadoop.io.Text;
import javax.annotation.Nullable;
@@ -511,7 +512,7 @@
case BINARY:
className = WritableConstantBinaryObjectInspector.class.getName();
return HiveReflectionUtils.createConstantObjectInspector(
- className, ByteWritable.class, value);
+ className, BytesWritable.class, value);
case UNKNOWN:
case VOID:
// If type is null, we use the Constant String to replace
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java b/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index 1b1dd03..7ded892 100644
--- a/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ b/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -286,6 +286,53 @@
assertThat(udf.eval(result)).isEqualTo(3);
}
+ @Test
+ public void testInitUDFWithConstantArguments() {
+ // test init udf with different type of constants as arguments to
+ // make sure we can get the ConstantObjectInspector normally
+
+ // test with byte type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.TINYINT()});
+ // test with short type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.SMALLINT()});
+ // test with int type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.INT()});
+ // test with long type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.BIGINT()});
+ // test with float type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.FLOAT()});
+ // test with double type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.DOUBLE()});
+ // test with string type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {"test"}, new DataType[] {DataTypes.STRING()});
+ // test with char type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {"tes"}, new DataType[] {DataTypes.CHAR(7)});
+ // test with varchar type as constant argument
+ init(GenericUDFCoalesce.class, new Object[] {"tes"}, new DataType[] {DataTypes.VARCHAR(7)});
+ // test with date type as constant argument
+ init(
+ GenericUDFCoalesce.class,
+ new Object[] {new Date(10000)},
+ new DataType[] {DataTypes.DATE()});
+ // test with timestamp type as constant argument
+ init(
+ GenericUDFCoalesce.class,
+ new Object[] {new Timestamp(10000)},
+ new DataType[] {DataTypes.TIMESTAMP()});
+
+ // test with decimal type as constant argument
+ init(
+ GenericUDFCoalesce.class,
+ new Object[] {new BigDecimal("23.45")},
+ new DataType[] {DataTypes.DECIMAL(10, 3)});
+
+ // test with binary type as constant argument
+ init(
+ GenericUDFCoalesce.class,
+ new Object[] {new byte[] {1, 2}},
+ new DataType[] {DataTypes.BYTES()});
+ }
+
private static HiveGenericUDF init(
Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) {
HiveGenericUDF udf =