SQOOP-2387: Sqoop should support importing from table with column names containing some special character

(Venkat Ranganathan via Jarek Jarcec Cecho)
diff --git a/build.xml b/build.xml
index d6ddd06..148cc6d 100644
--- a/build.xml
+++ b/build.xml
@@ -876,7 +876,7 @@
     </copy>
     <junit
       printsummary="yes" showoutput="${test.output}"
-      haltonfailure="no" fork="yes" maxmemory="512m"
+      haltonfailure="no" fork="yes" maxmemory="1024m"
       errorProperty="tests.failed" failureProperty="tests.failed"
       timeout="${test.timeout}"
       dir="${build.test}/data">
diff --git a/src/docs/user/import.txt b/src/docs/user/import.txt
index 342633a..462d84b 100644
--- a/src/docs/user/import.txt
+++ b/src/docs/user/import.txt
@@ -330,6 +330,19 @@
 
 Sqoop will rise exception in case that some configured mapping will not be used.
 
+Schema name handling
+^^^^^^^^^^^^^^^^^^^^
+
+When sqoop imports data from an enterprise store, table and column names
+may have characters that are not valid Java identifier characters or
+Avro/Parquet identifiers.  To address this, sqoop translates these characters
+to _ as part of the schema creation.  Any column name starting with an _
+(underscore) character will be translated to have two underscore characters.
+For example _AVRO will be converted to __AVRO.
+
+In the case of HCatalog imports, column names are converted to lower case when
+mapped to HCatalog columns.  This may change in future.
+
 Incremental Imports
 ^^^^^^^^^^^^^^^^^^^
 
diff --git a/src/java/org/apache/sqoop/avro/AvroUtil.java b/src/java/org/apache/sqoop/avro/AvroUtil.java
index 9036076..dffbf6e 100644
--- a/src/java/org/apache/sqoop/avro/AvroUtil.java
+++ b/src/java/org/apache/sqoop/avro/AvroUtil.java
@@ -88,7 +88,8 @@
    * Convert Column name into Avro column name.
    */
   public static String toAvroColumn(String column) {
-    return toAvroIdentifier(column);
+    String candidate = ClassWriter.toJavaIdentifier(column);
+    return toAvroIdentifier(candidate);
   }
 
   /**
diff --git a/src/java/org/apache/sqoop/manager/HsqldbManager.java b/src/java/org/apache/sqoop/manager/HsqldbManager.java
index fefac3f..9b9c582 100644
--- a/src/java/org/apache/sqoop/manager/HsqldbManager.java
+++ b/src/java/org/apache/sqoop/manager/HsqldbManager.java
@@ -66,6 +66,11 @@
   }
 
   @Override
+  public String escapeColName(String colName) {
+    return '"' + colName + '"';
+  }
+
+  @Override
   /**
    * {@inheritDoc}
    */
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java
index 878f765..aba2458 100644
--- a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatImportHelper.java
@@ -112,7 +112,8 @@
       hCatFullTableSchema.append(hfs);
     }
     fieldCount = hCatFullTableSchema.size();
-    lobLoader = new LargeObjectLoader(conf, new Path(jobInfo.getTableInfo().getTableLocation()));
+    lobLoader = new LargeObjectLoader(conf, new Path(jobInfo.getTableInfo()
+      .getTableLocation()));
     bigDecimalFormatString = conf.getBoolean(
       ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT,
       ImportJobBase.PROPERTY_BIGDECIMAL_FORMAT_DEFAULT);
@@ -179,7 +180,12 @@
       if (skip) {
         continue;
       }
-      HCatFieldSchema hfs = hCatFullTableSchema.get(hfn);
+      HCatFieldSchema hfs = null;
+      try {
+        hfs = hCatFullTableSchema.get(hfn);
+      } catch (Exception e) {
+        throw new IOException("Unable to lookup " + hfn + " in the hcat schema");
+      }
       if (debugHCatImportMapper) {
         LOG.debug("SqoopRecordVal: field = " + key + " Val " + val
           + " of type " + (val == null ? null : val.getClass().getName())
diff --git a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java
index 4070c24..9d5a9ee 100644
--- a/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java
+++ b/src/java/org/apache/sqoop/mapreduce/hcat/SqoopHCatUtilities.java
@@ -387,10 +387,15 @@
 
     List<HCatFieldSchema> outputFieldList = new ArrayList<HCatFieldSchema>();
     for (String col : dbColumnNames) {
-      HCatFieldSchema hfs = hCatFullTableSchema.get(col);
-      if (hfs == null) {
-        throw new IOException("Database column " + col + " not found in "
-          + " hcatalog table.");
+      try {
+        HCatFieldSchema hfs = hCatFullTableSchema.get(col);
+        if (hfs == null) {
+          throw new IOException("Database column " + col + " not found in "
+              + " hcatalog table.");
+        }
+      } catch (Exception e) {
+        throw new IOException("Caught Exception checking database column " + col + " in "
+            + " hcatalog table.", e);
       }
       boolean skip=false;
       if (hCatStaticPartitionKeys != null) {
@@ -482,6 +487,20 @@
       } else {
         colNames = connManager.getColumnNamesForQuery(options.getSqlQuery());
       }
+    } else {
+      String[] fixedColNames = new String[colNames.length];
+      for (int i = 0; i < colNames.length; ++i) {
+        String userColName = colNames[i];
+
+        // Remove surrounding quotes if any
+        int len = userColName.length();
+        if (len > 2 && userColName.charAt(0) == '"' &&  userColName.charAt(len -1) == '"') {
+          userColName = userColName.substring(1, len -1);
+        }
+
+        fixedColNames[i] = userColName;
+      }
+      colNames = fixedColNames;
     }
 
     dbColumnNames = new String[colNames.length];
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java
index 1c6f7f4..bf40d2c 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -274,7 +274,7 @@
      * column "9isLegalInSql" was translated into "_9isLegalInSql" in original
      * code and will translate to same "_9isLegalInSql" now. However it would
      * be translated to "__9isLegalInSql" (notice that there are two "_" at the
-     * begging) if we would use output.startsWith instead.
+     * beginning) if we would use output.startsWith instead.
      */
     } else if (candidate.startsWith("_")) {
       return "_" + output;
@@ -1062,7 +1062,7 @@
    * @param sb - StringBuilder to append code to
    */
   private void generateSetField(Map<String, Integer> columnTypes,
-      String [] colNames, StringBuilder sb) {
+      String [] colNames, String [] rawColNames, StringBuilder sb) {
 
     int numberOfMethods =
             this.getNumberOfMethods(colNames, maxColumnsPerMethod);
@@ -1083,9 +1083,9 @@
       }
     } else {
       boolean first = true;
-      for (String colName : colNames) {
-        int sqlType = columnTypes.get(colName);
-        String javaType = toJavaType(colName, sqlType);
+      for (int i = 0; i < colNames.length; i++) {
+        int sqlType = columnTypes.get(colNames[i]);
+        String javaType = toJavaType(colNames[i], sqlType);
         if (null == javaType) {
           continue;
         } else {
@@ -1093,8 +1093,8 @@
             sb.append("    else");
           }
 
-          sb.append("    if (\"" + colName + "\".equals(__fieldName)) {\n");
-          sb.append("      this." + colName + " = (" + javaType
+          sb.append("    if (\"" + rawColNames[i] + "\".equals(__fieldName)) {\n");
+          sb.append("      this." + colNames[i] + " = (" + javaType
               + ") __fieldVal;\n");
           sb.append("    }\n");
           first = false;
@@ -1108,7 +1108,7 @@
     sb.append("  }\n");
 
     for (int i = 0; i < numberOfMethods; ++i) {
-      myGenerateSetField(columnTypes, colNames, sb, i, maxColumnsPerMethod);
+      myGenerateSetField(columnTypes, colNames, rawColNames, sb, i, maxColumnsPerMethod);
     }
   }
 
@@ -1121,7 +1121,7 @@
    * @param size - number of columns per method
    */
   private void myGenerateSetField(Map<String, Integer> columnTypes,
-                                  String[] colNames, StringBuilder sb,
+                                  String[] colNames, String[] rawColNames, StringBuilder sb,
                                   int methodNumber, int size) {
     sb.append("  public boolean setField" + methodNumber
             + "(String __fieldName, Object __fieldVal) {\n");
@@ -1130,6 +1130,7 @@
     for (int i = methodNumber * size;
          i < topBoundary(colNames, methodNumber, size); ++i) {
       String colName = colNames[i];
+      String rawColName = rawColNames[i];
       int sqlType = columnTypes.get(colName);
       String javaType = toJavaType(colName, sqlType);
       if (null == javaType) {
@@ -1139,7 +1140,7 @@
           sb.append("    else");
         }
 
-        sb.append("    if (\"" + colName + "\".equals(__fieldName)) {\n");
+        sb.append("    if (\"" + rawColName + "\".equals(__fieldName)) {\n");
         sb.append("      this." + colName + " = (" + javaType
             + ") __fieldVal;\n");
         sb.append("      return true;\n");
@@ -1160,7 +1161,7 @@
    * @param sb - StringBuilder to append code to
    */
   private void generateGetFieldMap(Map<String, Integer> columnTypes,
-      String [] colNames, StringBuilder sb) {
+      String [] colNames, String [] rawColNames, StringBuilder sb) {
     int numberOfMethods =
             this.getNumberOfMethods(colNames, maxColumnsPerMethod);
 
@@ -1172,14 +1173,14 @@
         sb.append("    this.getFieldMap" + i + "(__sqoop$field_map);\n");
       }
     } else {
-      myGenerateGetFieldMap(columnTypes, colNames, sb, 0,
+      myGenerateGetFieldMap(columnTypes, colNames, rawColNames, sb, 0,
               maxColumnsPerMethod, false);
     }
     sb.append("    return __sqoop$field_map;\n");
     sb.append("  }\n\n");
 
     for (int i = 0; i < numberOfMethods; ++i) {
-      myGenerateGetFieldMap(columnTypes, colNames, sb, i,
+      myGenerateGetFieldMap(columnTypes, colNames, rawColNames, sb, i,
               maxColumnsPerMethod, true);
     }
   }
@@ -1194,7 +1195,7 @@
    * @param wrapInMethod - wrap body in a method.
    */
   private void myGenerateGetFieldMap(Map<String, Integer> columnTypes,
-                                     String[] colNames, StringBuilder sb,
+                                     String[] colNames, String[] rawColNames, StringBuilder sb,
                                      int methodNumber, int size,
                                      boolean wrapInMethod) {
     if (wrapInMethod) {
@@ -1204,9 +1205,8 @@
 
     for (int i = methodNumber * size;
          i < topBoundary(colNames, methodNumber, size); ++i) {
-      String colName = colNames[i];
-      sb.append("    __sqoop$field_map.put(\"" + colName + "\", this."
-          + colName + ");\n");
+      sb.append("    __sqoop$field_map.put(\"" + rawColNames[i] + "\", this."
+          + colNames[i] + ");\n");
     }
 
     if (wrapInMethod) {
@@ -1618,7 +1618,7 @@
    * @return a list of column names in the same order which are
    * cleaned up to be used as identifiers in the generated Java class.
    */
-  private String [] cleanColNames(String [] colNames) {
+  public static String [] cleanColNames(String [] colNames) {
     String [] cleanedColNames = new String[colNames.length];
     for (int i = 0; i < colNames.length; i++) {
       String col = colNames[i];
@@ -1734,7 +1734,7 @@
 
     // Generate the Java code.
     StringBuilder sb = generateClassForColumns(columnTypes,
-        cleanedColNames, cleanedDbWriteColNames);
+        cleanedColNames, cleanedDbWriteColNames, colNames);
     // Write this out to a file in the jar output directory.
     // We'll move it to the user-visible CodeOutputDir after compiling.
     String codeOutDir = options.getJarOutputDir();
@@ -1811,7 +1811,18 @@
       // These column names were provided by the user. They may not be in
       // the same case as the keys in the columnTypes map. So make sure
       // we add the appropriate aliases in that map.
-      for (String userColName : colNames) {
+      // We also have to strip surrounding quotes if any.
+      String[] fixedColNames = new String[colNames.length];
+      for (int i = 0; i < colNames.length; ++i) {
+        String userColName = colNames[i];
+
+        // Remove surrounding quotes if any
+        int len = userColName.length();
+        if (len > 2 && userColName.charAt(0) == '"' &&  userColName.charAt(len -1) == '"') {
+          userColName = userColName.substring(1, len -1);
+        }
+
+        fixedColNames[i] = userColName;
         for (Map.Entry<String, Integer> typeEntry : columnTypes.entrySet()) {
           String typeColName = typeEntry.getKey();
           if (typeColName.equalsIgnoreCase(userColName)
@@ -1824,6 +1835,7 @@
           }
         }
       }
+      colNames = fixedColNames;
     }
     return colNames;
   }
@@ -1846,7 +1858,7 @@
    */
   private StringBuilder generateClassForColumns(
       Map<String, Integer> columnTypes,
-      String [] colNames, String [] dbWriteColNames) {
+      String [] colNames, String [] dbWriteColNames, String [] rawColNames) {
     if (colNames.length ==0) {
       throw new IllegalArgumentException("Attempted to generate class with "
           + "no columns!");
@@ -1919,8 +1931,8 @@
     generateToString(columnTypes, colNames, sb);
     generateParser(columnTypes, colNames, sb);
     generateCloneMethod(columnTypes, colNames, sb);
-    generateGetFieldMap(columnTypes, colNames, sb);
-    generateSetField(columnTypes, colNames, sb);
+    generateGetFieldMap(columnTypes, colNames, rawColNames, sb);
+    generateSetField(columnTypes, colNames, rawColNames, sb);
 
     // TODO(aaron): Generate hashCode(), compareTo(), equals() so it can be a
     // WritableComparable
diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java
index 260e80a..af4b481 100644
--- a/src/test/com/cloudera/sqoop/TestAvroImport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroImport.java
@@ -227,6 +227,27 @@
     assertEquals("AVRO1", 1987, record1.get("AVRO1"));
   }
 
+  public void testNonIdentCharactersInColumnName() throws IOException {
+    String [] names = { "test_a-v+r/o" };
+    String [] types = { "INT" };
+    String [] vals = { "2015" };
+    createTableWithColTypesAndNames(names, types, vals);
+
+    runImport(getOutputArgv(true, null));
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    Schema schema = reader.getSchema();
+    assertEquals(Schema.Type.RECORD, schema.getType());
+    List<Field> fields = schema.getFields();
+    assertEquals(types.length, fields.size());
+
+    checkField(fields.get(0), "TEST_A_V_R_O", Type.INT);
+
+    GenericRecord record1 = reader.next();
+    assertEquals("TEST_A_V_R_O", 2015, record1.get("TEST_A_V_R_O"));
+  }
+
   private void checkField(Field field, String name, Type type) {
     assertEquals(name, field.name());
     assertEquals(Schema.Type.UNION, field.schema().getType());
diff --git a/src/test/com/cloudera/sqoop/TestIncrementalImport.java b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
index 3e3f929..8ea9bca 100644
--- a/src/test/com/cloudera/sqoop/TestIncrementalImport.java
+++ b/src/test/com/cloudera/sqoop/TestIncrementalImport.java
@@ -514,10 +514,10 @@
       args.add("append");
       if (!appendTimestamp) {
         args.add("--check-column");
-        args.add("id");
+        args.add("ID");
       } else {
         args.add("--check-column");
-        args.add("last_modified");
+        args.add("LAST_MODIFIED");
       }
     } else {
       args.add("--incremental");
@@ -526,7 +526,7 @@
       args.add("LAST_MODIFIED");
     }
     args.add("--columns");
-    args.add("id");
+    args.add("ID");
     args.add("-m");
     args.add("1");
 
@@ -557,10 +557,10 @@
       args.add("append");
       if (!appendTimestamp) {
         args.add("--check-column");
-        args.add("id");
+        args.add("ID");
       } else {
         args.add("--check-column");
-        args.add("last_modified");
+        args.add("LAST_MODIFIED");
       }
     } else {
       args.add("--incremental");
diff --git a/src/test/com/cloudera/sqoop/TestParquetImport.java b/src/test/com/cloudera/sqoop/TestParquetImport.java
index 43995c1..2a6dd6c 100644
--- a/src/test/com/cloudera/sqoop/TestParquetImport.java
+++ b/src/test/com/cloudera/sqoop/TestParquetImport.java
@@ -21,12 +21,15 @@
 import com.cloudera.sqoop.testutil.CommonArgs;
 import com.cloudera.sqoop.testutil.HsqldbTestServer;
 import com.cloudera.sqoop.testutil.ImportJobTestCase;
+
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
 import org.kitesdk.data.CompressionType;
 import org.kitesdk.data.Dataset;
 import org.kitesdk.data.DatasetReader;
@@ -205,6 +208,31 @@
     }
   }
 
+  public void testNonIdentCharactersInColumnName() throws IOException {
+    String [] names = { "test_p-a+r/quet" };
+    String [] types = { "INT" };
+    String [] vals = { "2015" };
+    createTableWithColTypesAndNames(names, types, vals);
+
+    runImport(getOutputArgv(true, null));
+
+    Schema schema = getSchema();
+    assertEquals(Type.RECORD, schema.getType());
+    List<Field> fields = schema.getFields();
+    assertEquals(types.length, fields.size());
+    checkField(fields.get(0), "TEST_P_A_R_QUET", Type.INT);
+
+    DatasetReader<GenericRecord> reader = getReader();
+    try {
+      assertTrue(reader.hasNext());
+      GenericRecord record1 = reader.next();
+      assertEquals("TEST_P_A_R_QUET", 2015, record1.get("TEST_P_A_R_QUET"));
+      assertFalse(reader.hasNext());
+    } finally {
+      reader.close();
+    }
+  }
+
   public void testNullableParquetImport() throws IOException, SQLException {
     String [] types = { "INT" };
     String [] vals = { null };
diff --git a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
index e3098d6..99e4293 100644
--- a/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
+++ b/src/test/com/cloudera/sqoop/testutil/BaseSqoopTestCase.java
@@ -317,7 +317,7 @@
         conn = getManager().getConnection();
 
         for (int i = 0; i < colTypes.length; i++) {
-          columnDefStr += colNames[i] + " " + colTypes[i];
+          columnDefStr += '"' + colNames[i].toUpperCase() + '"' + " " + colTypes[i];
           if (i < colTypes.length - 1) {
             columnDefStr += ", ";
           }
@@ -349,7 +349,7 @@
         String columnListStr = "";
         String valueListStr = "";
         for (int i = 0; i < colTypes.length; i++) {
-          columnListStr += colNames[i];
+          columnListStr += '"' + colNames[i].toUpperCase() + '"';
           valueListStr += vals[count * colTypes.length + i];
           if (i < colTypes.length - 1) {
             columnListStr += ", ";
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
index d97b870..8db06d1 100644
--- a/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
+++ b/src/test/org/apache/sqoop/hcat/HCatalogImportTest.java
@@ -258,7 +258,11 @@
       for (ColumnGenerator col : cols) {
         String name = col.getName().toLowerCase();
         expectedVal = col.getHCatValue(i);
-        actualVal = rec.get(name, schema);
+        try {
+          actualVal = rec.get(name, schema);
+        } catch (Exception e) {
+          throw new IOException("Unable to get value for field " + name + " from hcat record", e);
+        }
         LOG.info("Validating field: " + name + " (expected = "
           + expectedVal + ", actual = " + actualVal + ")");
         HCatalogTestUtils.assertEquals(expectedVal, actualVal);
@@ -320,7 +324,7 @@
       colNames[0] = "ID";
       colNames[1] = "MSG";
       for (int i = 0; i < cols.length; ++i) {
-        colNames[2 + i] = cols[i].getName().toUpperCase();
+        colNames[2 + i] =  cols[i].getName().toUpperCase();
       }
     }
     String[] importArgs;
@@ -850,4 +854,30 @@
         + " create-hcatalog-table with pre-existing table test", e);
     }
   }
+  public void testTableWithNonIdentColChars() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator("A-B+C*D/E",
+        "tinyint", Types.INTEGER, HCatFieldSchema.Type.INT, 0, 0, 10,
+        10, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols, null);
+  }
+  public void testTableCreationWithNonIdentColChars() throws Exception {
+    final int TOTAL_RECORDS = 1 * 10;
+    String table = getTableName().toUpperCase();
+    ColumnGenerator[] cols = new ColumnGenerator[] {
+      HCatalogTestUtils.colGenerator("A-B+C*D/E",
+        "tinyint", Types.INTEGER, HCatFieldSchema.Type.INT, 0, 0, 10,
+        10, KeyType.NOT_A_KEY),
+    };
+    List<String> addlArgsArray = new ArrayList<String>();
+    addlArgsArray.add("--create-hcatalog-table");
+    setExtraArgs(addlArgsArray);
+    runHCatImport(addlArgsArray, TOTAL_RECORDS, table, cols,
+      null, true, false);
+  }
 }
diff --git a/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java b/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java
index 32c267f..f4b1ea9 100644
--- a/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java
+++ b/src/test/org/apache/sqoop/hcat/HCatalogTestUtils.java
@@ -448,7 +448,7 @@
    * @return the name of the column
    */
   public static String forIdx(int idx) {
-    return "col" + idx;
+    return "COL" + idx;
   }
 
   public static ColumnGenerator colGenerator(final String name,
@@ -657,10 +657,10 @@
     StringBuilder sb = new StringBuilder();
     sb.append("CREATE TABLE ");
     sb.append(tableName);
-    sb.append(" (id INT NOT NULL PRIMARY KEY, msg VARCHAR(64)");
+    sb.append(" (ID INT NOT NULL PRIMARY KEY, MSG VARCHAR(64)");
     int colNum = 0;
     for (ColumnGenerator gen : extraCols) {
-      sb.append(", " + forIdx(colNum++) + " " + gen.getDBTypeString());
+      sb.append(", \"" + gen.getName() + "\" " + gen.getDBTypeString());
     }
     sb.append(")");
     String cmd = sb.toString();
@@ -674,13 +674,12 @@
     sb.append("INSERT INTO ");
     sb.append(tableName);
     sb.append(" (id, msg");
-    int colNum = 0;
     for (int i = 0; i < extraCols.length; ++i) {
-      sb.append(", " + forIdx(colNum++));
+      sb.append(", \"").append(extraCols[i].getName()).append('"');
     }
     sb.append(") VALUES ( ?, ?");
     for (int i = 0; i < extraCols.length; ++i) {
-      sb.append(",?");
+      sb.append(", ?");
     }
     sb.append(")");
     String s = sb.toString();
@@ -809,7 +808,7 @@
             break;
         }
         hCatTblCols
-          .add(new HCatFieldSchema(gen.getName(), tInfo, ""));
+          .add(new HCatFieldSchema(gen.getName().toLowerCase(), tInfo, ""));
       }
     }
     HCatSchema hCatTblSchema = new HCatSchema(hCatTblCols);
@@ -840,7 +839,7 @@
             break;
         }
         hCatPartCols
-          .add(new HCatFieldSchema(gen.getName(), tInfo, ""));
+          .add(new HCatFieldSchema(gen.getName().toLowerCase(), tInfo, ""));
       }
     }
     HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);
@@ -875,7 +874,7 @@
             break;
         }
         hCatPartCols
-          .add(new HCatFieldSchema(gen.getName(), tInfo, ""));
+          .add(new HCatFieldSchema(gen.getName().toLowerCase(), tInfo, ""));
       }
     }
     HCatSchema hCatPartSchema = new HCatSchema(hCatPartCols);