SQOOP-3123: Introduce escaping logic for column mapping parameters (same
what Sqoop already uses for the DB column names), thus special column
names (e.g. containing '#' character) and mappings realted to those
columns can be in the same format (thus not confusing the end users), and
also eliminates the related AVRO format clashing issues.

(Dmitry Zagorulkin via Attila Szabo)
diff --git a/src/java/org/apache/sqoop/SqoopOptions.java b/src/java/org/apache/sqoop/SqoopOptions.java
index caf95f6..c3f4604 100644
--- a/src/java/org/apache/sqoop/SqoopOptions.java
+++ b/src/java/org/apache/sqoop/SqoopOptions.java
@@ -35,6 +35,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.sqoop.accumulo.AccumuloConstants;
 import org.apache.sqoop.mapreduce.mainframe.MainframeConfiguration;
+import org.apache.sqoop.tool.BaseSqoopTool;
 import org.apache.sqoop.util.CredentialsUtil;
 import org.apache.sqoop.util.LoggingUtils;
 import org.apache.sqoop.util.SqoopJsonUtil;
@@ -53,6 +54,7 @@
 import com.cloudera.sqoop.util.StoredAsProperty;
 
 import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
+import static org.apache.sqoop.orm.ClassWriter.toJavaIdentifier;
 
 /**
  * Configurable state used by Sqoop tools.
@@ -78,7 +80,6 @@
   public static final String DEF_HCAT_HOME_OLD = "/usr/lib/hcatalog";
 
   public static final boolean METASTORE_PASSWORD_DEFAULT = false;
-
   /**
    * Thrown when invalid cmdline options are given.
    */
@@ -201,6 +202,9 @@
   // User explicit mapping of types
   private Properties mapColumnJava; // stored as map.colum.java
   private Properties mapColumnHive; // stored as map.column.hive
+  // SQOOP-3123 default enabled
+  private boolean escapeColumnMappingEnabled;
+  private Properties mapReplacedColumnJava; // used to replace special characters in columns
 
   // An ordered list of column names denoting what order columns are
   // serialized to a PreparedStatement from a generated record type.
@@ -874,6 +878,10 @@
         other.mapColumnJava = (Properties) this.mapColumnJava.clone();
       }
 
+      if (null != mapReplacedColumnJava) {
+        other.mapReplacedColumnJava = (Properties) this.mapReplacedColumnJava.clone();
+      }
+
       return other;
     } catch (CloneNotSupportedException cnse) {
       // Shouldn't happen.
@@ -1064,6 +1072,9 @@
 
     // set default metadata transaction isolation level to TRANSACTION_READ_COMMITTED
     this.metadataTransactionIsolationLevel = Connection.TRANSACTION_READ_COMMITTED;
+
+    // set escape column mapping to true
+    this.escapeColumnMappingEnabled = true;
   }
 
   /**
@@ -2734,4 +2745,38 @@
     getConf().setBoolean(ORACLE_ESCAPING_DISABLED, escapingDisabled);
   }
 
+  public void setEscapeMappingColumnNamesEnabled(boolean escapingEnabled) {
+    this.escapeColumnMappingEnabled = escapingEnabled;
+    // important to have custom setter to ensure option is available through
+    // Hadoop configuration on those places where SqoopOptions is not reachable
+    getConf().setBoolean(BaseSqoopTool.ESCAPE_MAPPING_COLUMN_NAMES_ENABLED, escapingEnabled);
+  }
+
+  public boolean getEscapeMappingColumnNamesEnabled() {
+    return escapeColumnMappingEnabled;
+  }
+
+  public Properties getColumnNames() {
+    if (escapeColumnMappingEnabled && null == mapReplacedColumnJava) {
+      return doCleanColumnMapping();
+    }
+    return escapeColumnMappingEnabled ? mapReplacedColumnJava : mapColumnJava;
+  }
+
+  private Properties doCleanColumnMapping() {
+      mapReplacedColumnJava = new Properties();
+
+      if (!mapColumnJava.isEmpty()) {
+        for (Map.Entry<Object, Object> entry : mapColumnJava.entrySet()) {
+          String candidate = toJavaIdentifier((String)entry.getKey());
+          mapReplacedColumnJava.put(candidate, mapColumnJava.getProperty((String)entry.getKey()));
+        }
+        return mapReplacedColumnJava;
+      }
+
+      return mapColumnJava;
+    }
+
+
 }
+
diff --git a/src/java/org/apache/sqoop/orm/ClassWriter.java b/src/java/org/apache/sqoop/orm/ClassWriter.java
index c18a36f..eaa9123 100644
--- a/src/java/org/apache/sqoop/orm/ClassWriter.java
+++ b/src/java/org/apache/sqoop/orm/ClassWriter.java
@@ -39,18 +39,18 @@
 import org.apache.sqoop.mapreduce.ImportJobBase;
 
 import com.cloudera.sqoop.SqoopOptions;
-import com.cloudera.sqoop.manager.ConnManager;
 import com.cloudera.sqoop.lib.BigDecimalSerializer;
+import com.cloudera.sqoop.lib.BlobRef;
 import com.cloudera.sqoop.lib.BooleanParser;
+import com.cloudera.sqoop.lib.ClobRef;
 import com.cloudera.sqoop.lib.DelimiterSet;
 import com.cloudera.sqoop.lib.FieldFormatter;
 import com.cloudera.sqoop.lib.JdbcWritableBridge;
 import com.cloudera.sqoop.lib.LargeObjectLoader;
 import com.cloudera.sqoop.lib.LobSerializer;
 import com.cloudera.sqoop.lib.RecordParser;
-import com.cloudera.sqoop.lib.BlobRef;
-import com.cloudera.sqoop.lib.ClobRef;
 import com.cloudera.sqoop.lib.SqoopRecord;
+import com.cloudera.sqoop.manager.ConnManager;
 
 /**
  * Creates an ORM class to represent a table from a database.
@@ -289,10 +289,9 @@
   }
 
   private String toJavaType(String columnName, int sqlType) {
-    Properties mapping = options.getMapColumnJava();
-
-    if (mapping.containsKey(columnName)) {
-      String type = mapping.getProperty(columnName);
+    Properties columnMapping = options.getColumnNames();
+    if (null != columnMapping && columnMapping.containsKey(columnName)) {
+      String type = (String) columnMapping.get(columnName);
       if (LOG.isDebugEnabled()) {
         LOG.info("Overriding type of column " + columnName + " to " + type);
       }
@@ -1705,7 +1704,8 @@
     }
 
     // Check that all explicitly mapped columns are present in result set
-    Properties mapping = options.getMapColumnJava();
+    Properties mapping = options.getColumnNames();
+
     if (mapping != null && !mapping.isEmpty()) {
       for(Object column : mapping.keySet()) {
         if (!uniqColNames.contains((String)column)) {
diff --git a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
index 3d37859..46f405f 100644
--- a/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
+++ b/src/java/org/apache/sqoop/tool/BaseSqoopTool.java
@@ -172,6 +172,7 @@
   public static final String RELAXED_ISOLATION = "relaxed-isolation";
   public static final String THROW_ON_ERROR_ARG = "throw-on-error";
   public static final String ORACLE_ESCAPING_DISABLED = "oracle-escaping-disabled";
+  public static final String ESCAPE_MAPPING_COLUMN_NAMES_ENABLED = "escape-mapping-column-names";
 
   // Arguments for validation.
   public static final String VALIDATE_ARG = "validate";
@@ -787,6 +788,12 @@
         .withDescription("Override mapping for specific columns to java types")
         .withLongOpt(MAP_COLUMN_JAVA)
         .create());
+    codeGenOpts.addOption(OptionBuilder
+        .hasArg()
+        .withDescription("Disable special characters escaping in column names")
+        .withLongOpt(ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)
+        .withArgName("boolean")
+        .create());
 
     if (!multiTable) {
       codeGenOpts.addOption(OptionBuilder.withArgName("name")
@@ -1084,6 +1091,11 @@
     if (in.hasOption(ORACLE_ESCAPING_DISABLED)) {
       out.setOracleEscapingDisabled(Boolean.parseBoolean(in.getOptionValue(ORACLE_ESCAPING_DISABLED)));
     }
+
+    if (in.hasOption(ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)) {
+      out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue(
+          ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)));
+    }
   }
 
   private void applyCredentialsOptions(CommandLine in, SqoopOptions out)
@@ -1355,6 +1367,11 @@
     if (!multiTable && in.hasOption(CLASS_NAME_ARG)) {
       out.setClassName(in.getOptionValue(CLASS_NAME_ARG));
     }
+
+    if (in.hasOption(ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)) {
+      out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue(
+          ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)));
+    }
   }
 
   protected void applyHBaseOptions(CommandLine in, SqoopOptions out) {
diff --git a/src/java/org/apache/sqoop/tool/ImportTool.java b/src/java/org/apache/sqoop/tool/ImportTool.java
index d1c9749..4b1b12d 100644
--- a/src/java/org/apache/sqoop/tool/ImportTool.java
+++ b/src/java/org/apache/sqoop/tool/ImportTool.java
@@ -28,18 +28,15 @@
 import java.util.List;
 import java.util.Map;
 
-import com.cloudera.sqoop.mapreduce.MergeJob;
-import com.cloudera.sqoop.orm.TableClassName;
-import com.cloudera.sqoop.util.ClassLoaderStack;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.sqoop.avro.AvroSchemaMismatchException;
 
 import com.cloudera.sqoop.SqoopOptions;
 import com.cloudera.sqoop.SqoopOptions.InvalidOptionsException;
@@ -47,12 +44,14 @@
 import com.cloudera.sqoop.cli.ToolOptions;
 import com.cloudera.sqoop.hive.HiveImport;
 import com.cloudera.sqoop.manager.ImportJobContext;
+import com.cloudera.sqoop.mapreduce.MergeJob;
 import com.cloudera.sqoop.metastore.JobData;
 import com.cloudera.sqoop.metastore.JobStorage;
 import com.cloudera.sqoop.metastore.JobStorageFactory;
+import com.cloudera.sqoop.orm.TableClassName;
 import com.cloudera.sqoop.util.AppendUtils;
+import com.cloudera.sqoop.util.ClassLoaderStack;
 import com.cloudera.sqoop.util.ImportException;
-import org.apache.sqoop.avro.AvroSchemaMismatchException;
 
 import static org.apache.sqoop.manager.SupportedManagers.MYSQL;
 
@@ -1007,6 +1006,11 @@
         out.setAutoResetToOneMapper(true);
       }
 
+      if (in.hasOption(ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)) {
+        out.setEscapeMappingColumnNamesEnabled(Boolean.parseBoolean(in.getOptionValue(
+            ESCAPE_MAPPING_COLUMN_NAMES_ENABLED)));
+      }
+
       applyIncrementalOptions(in, out);
       applyHiveOptions(in, out);
       applyOutputFormatOptions(in, out);
diff --git a/src/test/com/cloudera/sqoop/TestAvroImport.java b/src/test/com/cloudera/sqoop/TestAvroImport.java
index 26edd4c..da79c7a 100644
--- a/src/test/com/cloudera/sqoop/TestAvroImport.java
+++ b/src/test/com/cloudera/sqoop/TestAvroImport.java
@@ -48,6 +48,7 @@
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -317,7 +318,49 @@
 
     GenericRecord record1 = reader.next();
     assertNull(record1.get("DATA_COL0"));
+  }
 
+  @Test
+  public void testSpecialCharactersInColumnMappingWithConvertion() throws IOException, SQLException {
+    // escaping enabled by default
+    String [] extraArgsEscapeColNamesWithMapping = { "--map-column-java",
+        "INTFIELD1=String,DATA_#_COL0=String,DATA#COL1=String,DATA___COL2=String"};
+
+    // disable escaping
+    String [] extraArgsEscapingDisables = {"--escape-mapping-column-names", "false"};
+
+    // escaping enabled but mapping not provided
+    String [] extraArgsEscapingWithoutMapping = {};
+
+    checkRecordWithExtraArgs(extraArgsEscapeColNamesWithMapping, "TABLE1");
+    checkRecordWithExtraArgs(extraArgsEscapingDisables, "TABLE2");
+    checkRecordWithExtraArgs(extraArgsEscapingWithoutMapping, "TABLE3");
+  }
+
+  private void checkRecordWithExtraArgs(String[] extraArgs, String tableName) throws IOException {
+    String date = "2017-01-19";
+    String timeStamp = "2017-01-19 14:47:57.112000";
+
+    String [] names = {"INTFIELD1", "DATA_#_COL0", "DATA#COL1", "DATA___COL2"};
+    String [] types = { "INT", "DATE", "TIMESTAMP", "DECIMAL(2,20)" };
+    String [] vals = {"1", "{ts \'" + date + "\'}", "{ts \'" + timeStamp + "\'}", "2e20"};
+
+    String [] checkNames =  {"INTFIELD1", "DATA___COL0", "DATA_COL1", "DATA___COL2"};
+
+    setCurTableName(tableName);
+
+    createTableWithColTypesAndNames(names, types, vals);
+    runImport(getOutputArgv(true, extraArgs));
+
+    Path outputFile = new Path(getTablePath(), "part-m-00000.avro");
+    DataFileReader<GenericRecord> reader = read(outputFile);
+    GenericRecord record = reader.next();
+
+    for (String columnName : checkNames) {
+      assertNotNull(record.get(columnName));
+    }
+
+    removeTableDir();
   }
 
   protected DataFileReader<GenericRecord> read(Path filename) throws IOException {
diff --git a/src/test/com/cloudera/sqoop/TestSqoopOptions.java b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
index 0b33b24..dbdd2f1 100644
--- a/src/test/com/cloudera/sqoop/TestSqoopOptions.java
+++ b/src/test/com/cloudera/sqoop/TestSqoopOptions.java
@@ -20,21 +20,18 @@
 
 import java.util.Properties;
 
-import com.cloudera.sqoop.tool.BaseSqoopTool;
-
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.sqoop.manager.oracle.OracleUtils;
-
-import com.cloudera.sqoop.lib.DelimiterSet;
-import com.cloudera.sqoop.tool.ImportTool;
-import com.cloudera.sqoop.testutil.HsqldbTestServer;
-import org.junit.Before;
 import org.junit.After;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
+
+import com.cloudera.sqoop.lib.DelimiterSet;
+import com.cloudera.sqoop.testutil.HsqldbTestServer;
+import com.cloudera.sqoop.tool.BaseSqoopTool;
+import com.cloudera.sqoop.tool.ImportTool;
 
 import static org.apache.sqoop.Sqoop.SQOOP_RETHROW_PROPERTY;
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -782,4 +779,19 @@
     validateImportOptions(extraArgs);
   }
 
+  @Test
+  public void testEscapeMapingColumnNames() throws Exception {
+    SqoopOptions opts = new SqoopOptions();
+    // enabled by default
+    assertTrue(opts.getEscapeMappingColumnNamesEnabled());
+
+    String [] args = {
+        "--" + org.apache.sqoop.tool.BaseSqoopTool.ESCAPE_MAPPING_COLUMN_NAMES_ENABLED,
+        "false",
+    };
+
+    opts = parse(args);
+    assertFalse(opts.getEscapeMappingColumnNamesEnabled());
+  }
+
 }