[FLINK-28036][hive] HiveInspectors should use correct writable type to create ConstantObjectInspector

This closes #19949
diff --git a/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java b/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
index 0dfd290..afa3b96 100644
--- a/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
+++ b/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/conversion/HiveInspectors.java
@@ -37,9 +37,12 @@
 import org.apache.hadoop.hive.common.type.HiveChar;
 import org.apache.hadoop.hive.common.type.HiveDecimal;
 import org.apache.hadoop.hive.common.type.HiveVarchar;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
 import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
 import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -88,12 +91,10 @@
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.ShortWritable;
 import org.apache.hadoop.io.Text;
 
 import javax.annotation.Nullable;
@@ -511,7 +512,7 @@
             case BINARY:
                 className = WritableConstantBinaryObjectInspector.class.getName();
                 return HiveReflectionUtils.createConstantObjectInspector(
-                        className, ByteWritable.class, value);
+                        className, BytesWritable.class, value);
             case UNKNOWN:
             case VOID:
                 // If type is null, we use the Constant String to replace
diff --git a/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java b/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
index 1b1dd03..7ded892 100644
--- a/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
+++ b/flink-connector-hive/src/test/java/org/apache/flink/table/functions/hive/HiveGenericUDFTest.java
@@ -286,6 +286,53 @@
         assertThat(udf.eval(result)).isEqualTo(3);
     }
 
+    @Test
+    public void testInitUDFWithConstantArguments() {
+        // test init udf with different type of constants as arguments to
+        // make sure we can get the ConstantObjectInspector normally
+
+        // test with byte type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.TINYINT()});
+        // test with short type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.SMALLINT()});
+        // test with int type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.INT()});
+        // test with long type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.BIGINT()});
+        // test with float type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.FLOAT()});
+        // test with double type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {1}, new DataType[] {DataTypes.DOUBLE()});
+        // test with string type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {"test"}, new DataType[] {DataTypes.STRING()});
+        // test with char type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {"tes"}, new DataType[] {DataTypes.CHAR(7)});
+        // test with varchar type as constant argument
+        init(GenericUDFCoalesce.class, new Object[] {"tes"}, new DataType[] {DataTypes.VARCHAR(7)});
+        // test with date type as constant argument
+        init(
+                GenericUDFCoalesce.class,
+                new Object[] {new Date(10000)},
+                new DataType[] {DataTypes.DATE()});
+        // test with timestamp type as constant argument
+        init(
+                GenericUDFCoalesce.class,
+                new Object[] {new Timestamp(10000)},
+                new DataType[] {DataTypes.TIMESTAMP()});
+
+        // test with decimal type as constant argument
+        init(
+                GenericUDFCoalesce.class,
+                new Object[] {new BigDecimal("23.45")},
+                new DataType[] {DataTypes.DECIMAL(10, 3)});
+
+        // test with binary type as constant argument
+        init(
+                GenericUDFCoalesce.class,
+                new Object[] {new byte[] {1, 2}},
+                new DataType[] {DataTypes.BYTES()});
+    }
+
     private static HiveGenericUDF init(
             Class hiveUdfClass, Object[] constantArgs, DataType[] argTypes) {
         HiveGenericUDF udf =