SQOOP-2370: Netezza - need to support additional options for full control character handling
   (Venkat Ranganathan)
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index 496d3cf..c5ce4d6 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -381,31 +381,47 @@
                                       of data slices of a table or all\
                                       Default is "false" for standard mode\
                                       and "true" for direct mode.
-+--max-errors+                        Applicable only in direct mode.\
++--max-errors+                        Applicable only for direct mode export.\
                                       This option specifies the error threshold\
                                       per mapper while transferring data. If\
                                       the number of errors encountered exceed\
                                       this threshold then the job will fail.
                                       Default value is 1.
-+--log-dir+                           Applicable only in direct mode.\
++--log-dir+                           Applicable only for direct mode export.\
                                       Specifies the directory where Netezza\
                                       external table operation logs are stored\
                                       on the hadoop filesystem.  Logs are\
                                       stored under this directory with one\
                                       directory for the job and sub-directories\
                                       for each task number and attempt.\
-                                      Default value is the user home directory.
-+--trunc-string+                      Applicable only in direct mode.\
+                                      Default value is the user home directory.\
+                                      The nzlog and nzbad files will be under
+                                      (logdir)/job-id/job-attempt-id.
++--trunc-string+                      Applicable only for direct mode export.\
                                       Specifies whether the system \
                                       truncates strings to the declared\
                                       storage and loads the data. By default\
                                       truncation of strings is reported as an\
                                       error.
-+--ctrl-chars+                        Applicable only in direct mode.\
++--ctrl-chars+                        Applicable only for direct mode export.\
                                       Specifies whether control characters \
                                       (ASCII chars 1 - 31) can be allowed \
                                       to be part of char/nchar/varchar/nvarchar\
                                       columns.  Default is false.
++--crin-string+                       Applicable only for direct mode export.\
+                                      Specifies whether carriage return \
+                                      (ASCII char 13) can be allowed \
+                                      to be part of char/nchar/varchar/nvarchar\
+                                      columns.  Note that CR can no longer \
+                                      be a record delimiter with this option.\
+                                      Default is false.
++--ignore-zero+                       Applicable only for direct mode export.\
+                                      Specifies whether NUL character \
+                                      (ASCII char 0) should be scanned \
+                                      and ignored as part of the data loaded\
+                                      into char/nchar/varchar/nvarchar \
+                                      columns.\
+                                      Default is false.
 --------------------------------------------------------------------------------
 
 
diff --git a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
index 06fa976..2ec0770 100644
--- a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
@@ -64,6 +64,18 @@
   public static final String NETEZZA_CTRL_CHARS_LONG_ARG =
       "ctrl-chars";
 
+
+  public static final String NETEZZA_CRIN_STRING_OPT =
+      "netezza.crin.string";
+  public static final String NETEZZA_CRIN_STRING_LONG_ARG =
+      "crin-string";
+
+
+  public static final String NETEZZA_IGNORE_ZERO_OPT =
+      "netezza.ignore.zero";
+  public static final String NETEZZA_IGNORE_ZERO_LONG_ARG =
+      "ignore-zero";
+
   public static final String NETEZZA_TRUNC_STRING_OPT =
       "netezza.trunc.string";
   public static final String NETEZZA_TRUNC_STRING_LONG_ARG =
@@ -268,6 +280,12 @@
     netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TRUNC_STRING_OPT)
       .withDescription("Truncate string to declared storage size")
       .withLongOpt(NETEZZA_TRUNC_STRING_LONG_ARG).create());
+    netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_CRIN_STRING_OPT)
+      .withDescription("Truncate string to declared storage size")
+      .withLongOpt(NETEZZA_CRIN_STRING_LONG_ARG).create());
+    netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_IGNORE_ZERO_OPT)
+      .withDescription("Truncate string to declared storage size")
+      .withLongOpt(NETEZZA_IGNORE_ZERO_LONG_ARG).create());
     return netezzaOpts;
   }
 
@@ -296,6 +314,12 @@
     conf.setBoolean(NETEZZA_TRUNC_STRING_OPT,
       cmdLine.hasOption(NETEZZA_TRUNC_STRING_LONG_ARG));
 
+    conf.setBoolean(NETEZZA_CRIN_STRING_OPT,
+      cmdLine.hasOption(NETEZZA_CRIN_STRING_LONG_ARG));
+
+    conf.setBoolean(NETEZZA_IGNORE_ZERO_OPT,
+      cmdLine.hasOption(NETEZZA_IGNORE_ZERO_LONG_ARG));
+
     // Always true for Netezza direct mode access
     conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
   }
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
index f377fb9..aa058d1 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
@@ -87,7 +87,10 @@
         conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false);
     boolean truncString =
         conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false);
-
+    boolean ignoreZero =
+        conf.getBoolean(DirectNetezzaManager.NETEZZA_IGNORE_ZERO_OPT, false);
+    boolean crinString =
+        conf.getBoolean(DirectNetezzaManager.NETEZZA_CRIN_STRING_OPT, false);
     StringBuilder sqlStmt = new StringBuilder(2048);
 
     sqlStmt.append("INSERT INTO ");
@@ -96,13 +99,20 @@
     sqlStmt.append(fifoFile.getAbsolutePath());
     sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
     sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' ");
-    sqlStmt.append(" CRINSTRING FALSE ");
+    if (crinString) {
+      sqlStmt.append(" CRINSTRING TRUE ");
+    } else {
+      sqlStmt.append(" CRINSTRING FALSE ");
+    }
     if (ctrlChars) {
       sqlStmt.append(" CTRLCHARS TRUE ");
     }
     if (truncString) {
       sqlStmt.append(" TRUNCSTRING TRUE ");
     }
+    if (ignoreZero) {
+      sqlStmt.append(" IGNOREZERO TRUE ");
+    }
     sqlStmt.append(" DELIMITER ");
     sqlStmt.append(Integer.toString(fd));
     sqlStmt.append(" ENCODING 'internal' ");
@@ -228,18 +238,24 @@
         }
         cleanup(context);
       } finally {
-        recordWriter.close();
-        extTableThread.join();
+        try {
+          recordWriter.close();
+          extTableThread.join();
+        } catch (Exception e) {
+          LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
+        }
         counter.stopClock();
         LOG.info("Transferred " + counter.toString());
+        FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
+          localLogDir, logDir, context.getJobID().toString(),
+          conf);
+
         if (extTableThread.hasExceptions()) {
           extTableThread.printException();
           throw new IOException(extTableThread.getException());
         }
       }
-      FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
-        localLogDir, logDir, context.getJobID().toString(),
-        conf);
+
     }
   }