[CALCITE-6210] Cast to VARBINARY causes an assertion failure

Signed-off-by: Mihai Budiu <mbudiu@feldera.com>
diff --git a/babel/src/test/resources/sql/redshift.iq b/babel/src/test/resources/sql/redshift.iq
index 4917e31..e0ef58a 100755
--- a/babel/src/test/resources/sql/redshift.iq
+++ b/babel/src/test/resources/sql/redshift.iq
@@ -1777,7 +1777,7 @@
 
 -- returns 8 (cf OCTET_LENGTH)
 select length('français');
-SELECT "LENGTH"(u&'fran\00e7ais')
+SELECT "LENGTH"('français')
 !explain-validated-on calcite
 
 # LOWER
@@ -1824,7 +1824,7 @@
 # OCTET_LENGTH
 -- returns 9 (cf LENGTH)
 select octet_length('français');
-SELECT OCTET_LENGTH(CAST(u&'fran\00e7ais' AS VARBINARY))
+SELECT OCTET_LENGTH(CAST('français' AS VARBINARY))
 !explain-validated-on calcite
 
 # POSITION is a synonym for STRPOS
diff --git a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
index 4ea1d23..362ec20 100644
--- a/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
+++ b/core/src/main/java/org/apache/calcite/adapter/enumerable/RexToLixTranslator.java
@@ -80,6 +80,7 @@
 import java.lang.reflect.Modifier;
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
+import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -308,6 +309,18 @@
     case ANY:
       return operand;
 
+    case VARBINARY:
+    case BINARY:
+      switch (sourceType.getSqlTypeName()) {
+      case CHAR:
+      case VARCHAR:
+        return Expressions.call(BuiltInMethod.STRING_TO_BINARY.method, operand,
+            new ConstantExpression(Charset.class, sourceType.getCharset()));
+
+      default:
+        return defaultExpression.get();
+      }
+
     case GEOMETRY:
       switch (sourceType.getSqlTypeName()) {
       case CHAR:
diff --git a/core/src/main/java/org/apache/calcite/rex/RexUtil.java b/core/src/main/java/org/apache/calcite/rex/RexUtil.java
index 5d7f473..e180436 100644
--- a/core/src/main/java/org/apache/calcite/rex/RexUtil.java
+++ b/core/src/main/java/org/apache/calcite/rex/RexUtil.java
@@ -1683,7 +1683,8 @@
    *
    * @param source source type
    * @param target target type
-   * @return true iff the conversion is a loss-less cast
+   * @return 'true' when the conversion can certainly be determined to be loss-less cast,
+   *         but may return 'false' for some lossless casts.
    */
   @API(since = "1.22", status = API.Status.EXPERIMENTAL)
   public static boolean isLosslessCast(RelDataType source, RelDataType target) {
diff --git a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
index d3d865d..9613955 100644
--- a/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
+++ b/core/src/main/java/org/apache/calcite/runtime/SqlFunctions.java
@@ -4575,6 +4575,14 @@
     }
   }
 
+  public static @PolyNull ByteString stringToBinary(@PolyNull String s, Charset charset) {
+    if (s == null) {
+      return null;
+    } else {
+      return new ByteString(s.getBytes(charset));
+    }
+  }
+
   /** Helper for CAST(... AS VARBINARY(maxLength)). */
   public static @PolyNull ByteString truncate(@PolyNull ByteString s, int maxLength) {
     if (s == null) {
diff --git a/core/src/main/java/org/apache/calcite/sql/SqlDialect.java b/core/src/main/java/org/apache/calcite/sql/SqlDialect.java
index 845d351..b8877fd 100644
--- a/core/src/main/java/org/apache/calcite/sql/SqlDialect.java
+++ b/core/src/main/java/org/apache/calcite/sql/SqlDialect.java
@@ -433,17 +433,13 @@
    */
   public void quoteStringLiteral(StringBuilder buf, @Nullable String charsetName,
       String val) {
-    if (containsNonAscii(val) && charsetName == null) {
-      quoteStringLiteralUnicode(buf, val);
-    } else {
-      if (charsetName != null) {
-        buf.append("_");
-        buf.append(charsetName);
-      }
-      buf.append(literalQuoteString);
-      buf.append(val.replace(literalEndQuoteString, literalEscapedQuote));
-      buf.append(literalEndQuoteString);
+    if (charsetName != null) {
+      buf.append("_");
+      buf.append(charsetName);
     }
+    buf.append(literalQuoteString);
+    buf.append(val.replace(literalEndQuoteString, literalEscapedQuote));
+    buf.append(literalEndQuoteString);
   }
 
   public void unparseCall(SqlWriter writer, SqlCall call, int leftPrec,
diff --git a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
index 6b4e948..650dc4e 100644
--- a/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
+++ b/core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
@@ -124,6 +124,7 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.math.BigDecimal;
+import java.nio.charset.Charset;
 import java.sql.ResultSet;
 import java.sql.Time;
 import java.sql.Timestamp;
@@ -604,6 +605,7 @@
       String.class, TimeZone.class),
   STRING_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE(SqlFunctions.class, "toTimestampWithLocalTimeZone",
       String.class),
+  STRING_TO_BINARY(SqlFunctions.class, "stringToBinary", String.class, Charset.class),
   TIMESTAMP_STRING_TO_TIMESTAMP_WITH_LOCAL_TIME_ZONE(SqlFunctions.class,
       "toTimestampWithLocalTimeZone", String.class, TimeZone.class),
   TIME_WITH_LOCAL_TIME_ZONE_TO_TIME(SqlFunctions.class, "timeWithLocalTimeZoneToTime",
diff --git a/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java b/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java
index 6affbf4..5e4e6b1 100644
--- a/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java
+++ b/core/src/test/java/org/apache/calcite/rel/rel2sql/RelToSqlConverterTest.java
@@ -7020,7 +7020,7 @@
     final String expected0 = "SELECT JSON_INSERT(\"product_name\", '$', 10)\n"
         + "FROM \"foodmart\".\"product\"";
     final String expected1 = "SELECT JSON_INSERT(NULL, '$', 10, '$', NULL, '$', "
-        + "u&'\\000a\\0009\\000a')\nFROM \"foodmart\".\"product\"";
+        + "'\n\t\n')\nFROM \"foodmart\".\"product\"";
     sql(query0).ok(expected0);
     sql(query1).ok(expected1);
   }
@@ -7032,7 +7032,7 @@
     final String expected = "SELECT JSON_REPLACE(\"product_name\", '$', 10)\n"
         + "FROM \"foodmart\".\"product\"";
     final String expected1 = "SELECT JSON_REPLACE(NULL, '$', 10, '$', NULL, '$', "
-        + "u&'\\000a\\0009\\000a')\nFROM \"foodmart\".\"product\"";
+        + "'\n\t\n')\nFROM \"foodmart\".\"product\"";
     sql(query).ok(expected);
     sql(query1).ok(expected1);
   }
@@ -7044,7 +7044,7 @@
     final String expected = "SELECT JSON_SET(\"product_name\", '$', 10)\n"
         + "FROM \"foodmart\".\"product\"";
     final String expected1 = "SELECT JSON_SET(NULL, '$', 10, '$', NULL, '$', "
-        + "u&'\\000a\\0009\\000a')\nFROM \"foodmart\".\"product\"";
+        + "'\n\t\n')\nFROM \"foodmart\".\"product\"";
     sql(query).ok(expected);
     sql(query1).ok(expected1);
   }
diff --git a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/ConstantExpression.java b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/ConstantExpression.java
index 3f92bc1..8e96c40 100644
--- a/linq4j/src/main/java/org/apache/calcite/linq4j/tree/ConstantExpression.java
+++ b/linq4j/src/main/java/org/apache/calcite/linq4j/tree/ConstantExpression.java
@@ -23,6 +23,7 @@
 import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.nio.charset.Charset;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
@@ -208,6 +209,12 @@
     if (value instanceof Set) {
       return writeSet(writer, (Set) value);
     }
+    if (value instanceof Charset) {
+      writer.append("java.nio.charset.Charset.forName(\"");
+      writer.append(value);
+      writer.append("\")");
+      return writer;
+    }
 
     Constructor constructor = matchingConstructor(value);
     if (constructor != null) {
diff --git a/site/_docs/reference.md b/site/_docs/reference.md
index ea9b375..e9d175a 100644
--- a/site/_docs/reference.md
+++ b/site/_docs/reference.md
@@ -1478,6 +1478,11 @@
 | CONVERT(value USING transcodingName)    | Alter *value* from one base character set to *transcodingName*
 | TRANSLATE(value USING transcodingName)  | Alter *value* from one base character set to *transcodingName*
 
+Converting a string to a **BINARY** or **VARBINARY** type produces the
+list of bytes of the string's encoding in the strings' charset.  A
+runtime error is produced if the string's characters cannot be
+represented using its charset.
+
 Supported data types syntax:
 
 {% highlight sql %}
diff --git a/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java b/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java
index e3a4c95..ed16091 100644
--- a/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java
+++ b/testkit/src/main/java/org/apache/calcite/test/SqlOperatorTest.java
@@ -707,6 +707,25 @@
         "CHAR(1) NOT NULL", "3");
   }
 
+  /**
+   * Test case for <a href="https://issues.apache.org/jira/browse/CALCITE-6210">
+   * Cast to VARBINARY causes an assertion failure</a>. */
+  @Test public void testVarbinaryCast() {
+    SqlOperatorFixture f = fixture();
+    f.checkScalar("CAST('00' AS VARBINARY)", "3030", "VARBINARY NOT NULL");
+    f.checkScalar("CAST('help' AS VARBINARY)", "68656c70", "VARBINARY NOT NULL");
+    f.checkScalar("CAST('help' AS VARBINARY(2))", "6865", "VARBINARY(2) NOT NULL");
+    f.checkScalar("CAST('00' AS BINARY(1))", "30", "BINARY(1) NOT NULL");
+    f.checkScalar("CAST('10' AS BINARY(2))", "3130", "BINARY(2) NOT NULL");
+    f.checkScalar("CAST('10' AS BINARY(1))", "31", "BINARY(1) NOT NULL");
+    f.checkScalar("CAST('10' AS BINARY(3))", "313000", "BINARY(3) NOT NULL");
+    f.checkScalar("CAST(_UTF8'Hello ਸੰਸਾਰ!' AS VARBINARY)",
+        "48656c6c6f20e0a8b8e0a9b0e0a8b8e0a8bee0a8b021", "VARBINARY NOT NULL");
+    f.checkFails("CAST('Hello ਸੰਸਾਰ!' AS VARBINARY)",
+        ".*Failed to encode .* in character set 'ISO-8859-1'", true);
+    f.checkNull("CAST(CAST(NULL AS VARCHAR) AS VARBINARY)");
+  }
+
   @ParameterizedTest
   @MethodSource("safeParameters")
   void testCastStringToDecimal(CastType castType, SqlOperatorFixture f) {