DRILL-8280: Cannot ANALYZE files containing non-ASCII column names (#2625)

diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SchemaFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SchemaFunctions.java
index b3d2414..d947a7b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SchemaFunctions.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/SchemaFunctions.java
@@ -151,8 +151,9 @@
       }
 
       org.apache.drill.exec.record.metadata.TupleMetadata currentSchema =
-          org.apache.drill.exec.expr.fn.impl.SchemaFunctions.getTupleMetadata(
-              org.apache.drill.common.util.DrillStringUtils.toBinaryString(input.buffer, input.start, input.end));
+        org.apache.drill.exec.expr.fn.impl.SchemaFunctions.getTupleMetadata(
+          org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer));
+
       if (schemaHolder.obj == null) {
         schemaHolder.obj = currentSchema;
         return;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
index 5172e47..7f42403 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/sql/TestMetastoreCommands.java
@@ -3568,6 +3568,34 @@
     }
   }
 
+  @Test // DRILL-8280
+  public void testNonAsciiColumnName() throws Exception {
+    String tableName = "utf8_col_name";
+    String colName = "Käse";
+
+    run("create table dfs.tmp.%s as select 'Cheddar' as `%s`", tableName, colName);
+    try {
+      testBuilder()
+        .sqlQuery("analyze table dfs.tmp.`%s` refresh metadata", tableName)
+        .unOrdered()
+        .baselineColumns("ok", "summary")
+        .baselineValues(true, String.format("Collected / refreshed metadata for table [dfs.tmp.%s]", tableName))
+        .go();
+      String query = "select column_name from information_schema.`columns` where table_name='%s' and column_name='%s'";
+
+      testBuilder()
+        .sqlQuery(query, tableName, colName)
+        .unOrdered()
+        .baselineColumns("column_name")
+        .baselineValues(colName)
+        .go();
+    } finally {
+      run("analyze table dfs.tmp.`%s` drop metadata if exists", tableName);
+      run("drop table if exists dfs.tmp.`%s`", tableName);
+    }
+  }
+
+
   public static <T> ColumnStatistics<T> getColumnStatistics(T minValue, T maxValue, long rowCount,
                                                             TypeProtos.MinorType minorType) {
     return new ColumnStatistics<>(