SQOOP-2164: Enhance the Netezza Connector for Sqoop

(Venkat Ranganathan via Abraham Elmahrek)
diff --git a/src/docs/user/connectors.txt b/src/docs/user/connectors.txt
index fee40d9..496d3cf 100644
--- a/src/docs/user/connectors.txt
+++ b/src/docs/user/connectors.txt
@@ -389,8 +389,23 @@
                                       Default value is 1.
 +--log-dir+                           Applicable only in direct mode.\
                                       Specifies the directory where Netezza\
-                                      external table operation logs are stored.\
-                                      Default value is /tmp.
+                                      external table operation logs are stored\
+                                      on the hadoop filesystem.  Logs are\
+                                      stored under this directory with one\
+                                      directory for the job and sub-directories\
+                                      for each task number and attempt.\
+                                      Default value is the user home directory.
++--trunc-string+                      Applicable only in direct mode.\
+                                      Specifies whether the system \
+                                      truncates strings to the declared\
+                                      storage and loads the data. By default\
+                                      truncation of strings is reported as an\
+                                      error.
++--ctrl-chars+                        Applicable only in direct mode.\
+                                      Specifies whether control characters \
+                                      (ASCII chars 1 - 31) can be allowed \
+                                      to be part of char/nchar/varchar/nvarchar\
+                                      columns.  Default is false.
 --------------------------------------------------------------------------------
 
 
diff --git a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
index 9ca8f63..daddb8c 100644
--- a/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
+++ b/src/java/org/apache/sqoop/manager/DirectNetezzaManager.java
@@ -33,6 +33,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableExportJob;
 import org.apache.sqoop.mapreduce.netezza.NetezzaExternalTableImportJob;
 
@@ -58,11 +59,24 @@
   public static final String NETEZZA_ERROR_THRESHOLD_LONG_ARG =
       "max-errors";
 
+  public static final String NETEZZA_CTRL_CHARS_OPT =
+      "netezza.ctrl.chars";
+  public static final String NETEZZA_CTRL_CHARS_LONG_ARG =
+      "ctrl-chars";
+
+  public static final String NETEZZA_TRUNC_STRING_OPT =
+      "netezza.trunc.string";
+  public static final String NETEZZA_TRUNC_STRING_LONG_ARG =
+      "trunc-string";
+
+
+
   private static final String QUERY_CHECK_DICTIONARY_FOR_TABLE =
       "SELECT 1 FROM _V_TABLE WHERE OWNER= ? "
       + " AND TABLENAME = ?";
   public static final String NETEZZA_NULL_VALUE =
       "netezza.exttable.null.value";
+
   public DirectNetezzaManager(SqoopOptions opts) {
     super(opts);
     try {
@@ -159,7 +173,7 @@
     options = context.getOptions();
     context.setConnManager(this);
 
-    checkTable(); // Throws excpetion as necessary
+    checkTable(); // Throws exception as necessary
     NetezzaExternalTableExportJob exporter = null;
 
     char qc = (char) options.getInputEnclosedBy();
@@ -248,6 +262,12 @@
     netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_LOG_DIR_OPT)
         .hasArg().withDescription("Netezza log directory")
         .withLongOpt(NETEZZA_LOG_DIR_LONG_ARG).create());
+    netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_CTRL_CHARS_OPT)
+      .withDescription("Allow control chars in data")
+      .withLongOpt(NETEZZA_CTRL_CHARS_LONG_ARG).create());
+    netezzaOpts.addOption(OptionBuilder.withArgName(NETEZZA_TRUNC_STRING_OPT)
+      .withDescription("Truncate string to declared storage size")
+      .withLongOpt(NETEZZA_TRUNC_STRING_LONG_ARG).create());
     return netezzaOpts;
   }
 
@@ -270,6 +290,12 @@
       conf.set(NETEZZA_LOG_DIR_OPT, dir);
     }
 
+    conf.setBoolean(NETEZZA_CTRL_CHARS_OPT,
+      cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG));
+
+    conf.setBoolean(NETEZZA_CTRL_CHARS_OPT,
+      cmdLine.hasOption(NETEZZA_CTRL_CHARS_LONG_ARG));
+
     // Always true for Netezza direct mode access
     conf.setBoolean(NETEZZA_DATASLICE_ALIGNED_ACCESS_OPT, true);
   }
@@ -291,4 +317,15 @@
   public boolean isDirectModeHCatSupported() {
     return true;
   }
+
+
+  public static String getLocalLogDir(TaskAttemptID attemptId) {
+      int tid = attemptId.getTaskID().getId();
+      int aid = attemptId.getId();
+      String jid = attemptId.getJobID().toString();
+      StringBuilder sb = new StringBuilder(jid).append('-');
+      sb.append(tid).append('-').append(aid);
+      String localLogDir = sb.toString();
+      return localLogDir;
+  }
 }
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
index 3613ff2..f377fb9 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
@@ -20,15 +20,21 @@
 
 import java.io.BufferedOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.sql.Connection;
 import java.sql.SQLException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.sqoop.io.NamedFifo;
@@ -36,6 +42,7 @@
 import org.apache.sqoop.manager.DirectNetezzaManager;
 import org.apache.sqoop.mapreduce.SqoopMapper;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.FileUploader;
 import org.apache.sqoop.util.PerfCounters;
 import org.apache.sqoop.util.TaskId;
 
@@ -61,6 +68,9 @@
   private NetezzaJDBCStatementRunner extTableThread;
   private PerfCounters counter;
   private DelimiterSet outputDelimiters;
+  private String localLogDir = null;
+  private String logDir = null;
+  private File taskAttemptDir = null;
 
   private String getSqlStatement(DelimiterSet delimiters) throws IOException {
 
@@ -72,7 +82,11 @@
 
     int errorThreshold = conf.getInt(
       DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
-    String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+
+    boolean ctrlChars =
+        conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false);
+    boolean truncString =
+        conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false);
 
     StringBuilder sqlStmt = new StringBuilder(2048);
 
@@ -83,6 +97,12 @@
     sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
     sqlStmt.append(" BOOLSTYLE 'TRUE_FALSE' ");
     sqlStmt.append(" CRINSTRING FALSE ");
+    if (ctrlChars) {
+      sqlStmt.append(" CTRLCHARS TRUE ");
+    }
+    if (truncString) {
+      sqlStmt.append(" TRUNCSTRING TRUE ");
+    }
     sqlStmt.append(" DELIMITER ");
     sqlStmt.append(Integer.toString(fd));
     sqlStmt.append(" ENCODING 'internal' ");
@@ -112,19 +132,18 @@
     }
     sqlStmt.append(" MAXERRORS ").append(errorThreshold);
 
-    if (logDir != null) {
-      logDir = logDir.trim();
-      if (logDir.length() > 0) {
-        File logDirPath = new File(logDir);
-        logDirPath.mkdirs();
-        if (logDirPath.canWrite() && logDirPath.isDirectory()) {
-          sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
-        } else {
-          throw new IOException("Unable to create log directory specified");
-        }
-      }
-    }
-    sqlStmt.append(")");
+
+
+  File logDirPath = new File(taskAttemptDir, localLogDir);
+  logDirPath.mkdirs();
+  if (logDirPath.canWrite() && logDirPath.isDirectory()) {
+     sqlStmt.append(" LOGDIR ")
+       .append(logDirPath.getAbsolutePath()).append(' ');
+  } else {
+      throw new IOException("Unable to create log directory specified");
+  }
+
+  sqlStmt.append(")");
 
     String stmt = sqlStmt.toString();
     LOG.debug("SQL generated for external table export" + stmt);
@@ -135,6 +154,12 @@
   private void initNetezzaExternalTableExport(Context context)
     throws IOException {
     this.conf = context.getConfiguration();
+
+    taskAttemptDir = TaskId.getLocalWorkPath(conf);
+    localLogDir =
+        DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
+    logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+
     dbc = new DBConfiguration(conf);
     File taskAttemptDir = TaskId.getLocalWorkPath(conf);
 
@@ -212,6 +237,9 @@
           throw new IOException(extTableThread.getException());
         }
       }
+      FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
+        localLogDir, logDir, context.getJobID().toString(),
+        conf);
     }
   }
 
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
index 2f4c152..486ba7c 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
@@ -21,14 +21,19 @@
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.sql.Connection;
 import java.sql.SQLException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.sqoop.config.ConfigurationHelper;
@@ -36,6 +41,7 @@
 import org.apache.sqoop.lib.DelimiterSet;
 import org.apache.sqoop.manager.DirectNetezzaManager;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.apache.sqoop.util.FileUploader;
 import org.apache.sqoop.util.PerfCounters;
 import org.apache.sqoop.util.TaskId;
 
@@ -61,7 +67,9 @@
     .getLog(NetezzaExternalTableImportMapper.class.getName());
   private NetezzaJDBCStatementRunner extTableThread;
   private PerfCounters counter;
-
+  private String localLogDir = null;
+  private String logDir = null;
+  private File taskAttemptDir = null;
   private String getSqlStatement(int myId) throws IOException {
 
     char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
@@ -70,6 +78,11 @@
 
     String nullValue = conf.get(DirectNetezzaManager.NETEZZA_NULL_VALUE);
 
+    boolean ctrlChars =
+        conf.getBoolean(DirectNetezzaManager.NETEZZA_CTRL_CHARS_OPT, false);
+    boolean truncString =
+        conf.getBoolean(DirectNetezzaManager.NETEZZA_TRUNC_STRING_OPT, false);
+
     int errorThreshold = conf.getInt(
       DirectNetezzaManager.NETEZZA_ERROR_THRESHOLD_OPT, 1);
     String logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
@@ -82,6 +95,12 @@
     sqlStmt.append("' USING (REMOTESOURCE 'JDBC' ");
     sqlStmt.append(" BOOLSTYLE 'T_F' ");
     sqlStmt.append(" CRINSTRING FALSE ");
+    if (ctrlChars) {
+      sqlStmt.append(" CTRLCHARS TRUE ");
+    }
+    if (truncString) {
+      sqlStmt.append(" TRUNCSTRING TRUE ");
+    }
     sqlStmt.append(" DELIMITER ");
     sqlStmt.append(Integer.toString(fd));
     sqlStmt.append(" ENCODING 'internal' ");
@@ -112,17 +131,12 @@
 
     sqlStmt.append(" MAXERRORS ").append(errorThreshold);
 
-    if (logDir != null) {
-      logDir = logDir.trim();
-      if (logDir.length() > 0) {
-        File logDirPath = new File(logDir);
-        logDirPath.mkdirs();
-        if (logDirPath.canWrite() && logDirPath.isDirectory()) {
-          sqlStmt.append(" LOGDIR ").append(logDir).append(' ');
-        } else {
-          throw new IOException("Unable to create log directory specified");
-        }
-      }
+    File logDirPath = new File(taskAttemptDir, localLogDir);
+    logDirPath.mkdirs();
+    if (logDirPath.canWrite() && logDirPath.isDirectory()) {
+       sqlStmt.append(" LOGDIR ").append(logDirPath.getAbsolutePath()).append(' ');
+    } else {
+        throw new IOException("Unable to create log directory specified");
     }
 
     sqlStmt.append(") AS SELECT ");
@@ -149,7 +163,7 @@
 
   private void initNetezzaExternalTableImport(int myId) throws IOException {
 
-    File taskAttemptDir = TaskId.getLocalWorkPath(conf);
+    taskAttemptDir = TaskId.getLocalWorkPath(conf);
 
     this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
     String filename = fifoFile.toString();
@@ -199,6 +213,10 @@
   public void map(Integer dataSliceId, NullWritable val, Context context)
     throws IOException, InterruptedException {
     conf = context.getConfiguration();
+    localLogDir =
+        DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
+    logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+
     dbc = new DBConfiguration(conf);
     numMappers = ConfigurationHelper.getConfNumMaps(conf);
     char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
@@ -232,7 +250,11 @@
           extTableThread.printException();
           throw new IOException(extTableThread.getException());
         }
+        FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
+          localLogDir, logDir, context.getJobID().toString(),
+          conf);
       }
     }
   }
+
 }
diff --git a/src/java/org/apache/sqoop/util/FileUploader.java b/src/java/org/apache/sqoop/util/FileUploader.java
new file mode 100644
index 0000000..155cffc
--- /dev/null
+++ b/src/java/org/apache/sqoop/util/FileUploader.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.util;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class FileUploader {
+  public static final Log LOG =
+        LogFactory.getLog(FileUploader.class.getName());
+
+  private FileUploader() { }
+
+  public static void uploadFilesToDFS(String srcBasePath, String src,
+    String destBasePath, String dest, Configuration conf) throws IOException {
+
+    FileSystem fs = FileSystem.get(conf);
+    Path targetPath = null;
+    Path srcPath = new Path(srcBasePath, src);
+
+    if (destBasePath == null || destBasePath.length() == 0) {
+      destBasePath = ".";
+    }
+
+    targetPath = new Path(destBasePath, dest);
+
+    if (!fs.exists(targetPath)) {
+      fs.mkdirs(targetPath);
+    }
+
+    Path targetPath2 = new Path(targetPath, src);
+    fs.delete(targetPath2, true);
+
+    try {
+      LOG.info("Copying " + srcPath + " to " + targetPath);
+      // Copy srcPath (on local FS) to targetPath on DFS.
+      // The first boolean arg instructs not to delete source and the second
+      // boolean arg instructs to overwrite dest if exists.
+      fs.copyFromLocalFile(false, true, srcPath, targetPath);
+    } catch (IOException ioe) {
+      LOG.warn("Unable to copy " + srcPath + " to " + targetPath);
+    }
+  }
+
+}
diff --git a/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java b/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java
index f7b68ed..92012c4 100644
--- a/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java
+++ b/src/test/org/apache/sqoop/manager/netezza/DirectNetezzaExportManualTest.java
@@ -155,6 +155,8 @@
         "--",
         "--log-dir", "/tmp",
         "--max-errors", "2",
+        "--trunc-string",
+        "--ctrl-chars"
      };
     String[] argv = getArgv(true, 10, 10, extraArgs);
     runNetezzaTest(getTableName(), argv);