SQOOP-3378: Error during direct Netezza import/export can interrupt process in uncontrolled ways

(Daniel Voros via Szabolcs Vasas)
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
index 5bf2188..6dbb98d 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
@@ -18,26 +18,14 @@
 
 package org.apache.sqoop.mapreduce.db.netezza;
 
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.io.IOUtils;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.sqoop.io.NamedFifo;
+import org.apache.sqoop.lib.DelimiterSet;
 import org.apache.sqoop.lib.SqoopRecord;
 import org.apache.sqoop.manager.DirectNetezzaManager;
 import org.apache.sqoop.mapreduce.SqoopMapper;
@@ -46,7 +34,14 @@
 import org.apache.sqoop.util.PerfCounters;
 import org.apache.sqoop.util.TaskId;
 
-import org.apache.sqoop.lib.DelimiterSet;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Netezza export mapper using external tables.
@@ -59,8 +54,10 @@
    */
 
   private Configuration conf;
-  private DBConfiguration dbc;
-  private File fifoFile;
+  @VisibleForTesting
+  DBConfiguration dbc;
+  @VisibleForTesting
+  File fifoFile;
   private Connection con;
   private OutputStream recordWriter;
   public static final Log LOG = LogFactory
@@ -69,8 +66,12 @@
   private PerfCounters counter;
   private DelimiterSet outputDelimiters;
   private String localLogDir = null;
-  private String logDir = null;
-  private File taskAttemptDir = null;
+  @VisibleForTesting
+  String logDir = null;
+  @VisibleForTesting
+  File taskAttemptDir = null;
+
+  private AtomicBoolean jdbcFailed = new AtomicBoolean(false);
 
   private String getSqlStatement(DelimiterSet delimiters) throws IOException {
 
@@ -168,9 +169,13 @@
     taskAttemptDir = TaskId.getLocalWorkPath(conf);
     localLogDir =
         DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
-    logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+    if (logDir == null) { // need to be able to set in tests
+      logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+    }
 
-    dbc = new DBConfiguration(conf);
+    if (dbc == null) { // need to be able to mock in tests
+      dbc = new DBConfiguration(conf);
+    }
     File taskAttemptDir = TaskId.getLocalWorkPath(conf);
 
     char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ',');
@@ -196,7 +201,7 @@
     boolean cleanup = false;
     try {
       con = dbc.getConnection();
-      extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+      extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed,
         con, sqlStmt);
     } catch (SQLException sqle) {
       cleanup = true;
@@ -226,49 +231,43 @@
   public void run(Context context) throws IOException, InterruptedException {
     setup(context);
     initNetezzaExternalTableExport(context);
-    if (extTableThread.isAlive()) {
-      try {
-        while (context.nextKeyValue()) {
-          if (Thread.interrupted()) {
-            if (!extTableThread.isAlive()) {
-              break;
-            }
-          }
-          map(context.getCurrentKey(), context.getCurrentValue(), context);
+    try {
+      while (context.nextKeyValue()) {
+        // Fail fast if there was an error during JDBC operation
+        if (jdbcFailed.get()) {
+          break;
         }
-        cleanup(context);
-      } finally {
-        try {
-          recordWriter.close();
-          extTableThread.join();
-        } catch (Exception e) {
-          LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
-        }
-        counter.stopClock();
-        LOG.info("Transferred " + counter.toString());
-        FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
-          localLogDir, logDir, context.getJobID().toString(),
-          conf);
-
-        if (extTableThread.hasExceptions()) {
-          extTableThread.printException();
-          throw new IOException(extTableThread.getException());
-        }
+        map(context.getCurrentKey(), context.getCurrentValue(), context);
       }
+      cleanup(context);
+    } finally {
+      try {
+        recordWriter.close();
+        extTableThread.join();
+      } catch (Exception e) {
+        LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
+      }
+      counter.stopClock();
+      LOG.info("Transferred " + counter.toString());
+      FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
+        localLogDir, logDir, context.getJobID().toString(),
+        conf);
 
+      if (extTableThread.hasExceptions()) {
+        extTableThread.printException();
+        throw new IOException(extTableThread.getException());
+      }
     }
   }
 
-  protected void writeTextRecord(Text record) throws IOException,
-    InterruptedException {
+  protected void writeTextRecord(Text record) throws IOException {
     String outputStr = record.toString() + "\n";
     byte[] outputBytes = outputStr.getBytes("UTF-8");
     counter.addBytes(outputBytes.length);
     recordWriter.write(outputBytes, 0, outputBytes.length);
   }
 
-  protected void writeSqoopRecord(SqoopRecord sqr) throws IOException,
-    InterruptedException {
+  protected void writeSqoopRecord(SqoopRecord sqr) throws IOException {
     String outputStr = sqr.toString(this.outputDelimiters);
     byte[] outputBytes = outputStr.getBytes("UTF-8");
     counter.addBytes(outputBytes.length);
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
index 306062a..3124b17 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
@@ -18,22 +18,10 @@
 
 package org.apache.sqoop.mapreduce.db.netezza;
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.io.IOUtils;
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.sqoop.config.ConfigurationHelper;
@@ -42,10 +30,18 @@
 import org.apache.sqoop.manager.DirectNetezzaManager;
 import org.apache.sqoop.mapreduce.AutoProgressMapper;
 import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.util.FileUploader;
 import org.apache.sqoop.util.PerfCounters;
 import org.apache.sqoop.util.TaskId;
 
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * Netezza import mapper using external tables.
  */
@@ -57,8 +53,10 @@
    */
 
   private Configuration conf;
-  private DBConfiguration dbc;
-  private File fifoFile;
+  @VisibleForTesting
+  DBConfiguration dbc;
+  @VisibleForTesting
+  File fifoFile;
   private int numMappers;
   private Connection con;
   private BufferedReader recordReader;
@@ -66,7 +64,11 @@
     .getLog(NetezzaExternalTableImportMapper.class.getName());
   private NetezzaJDBCStatementRunner extTableThread;
   private PerfCounters counter;
-  private File taskAttemptDir = null;
+  @VisibleForTesting
+  File taskAttemptDir = null;
+
+  private AtomicBoolean jdbcFailed = new AtomicBoolean(false);
+
   private String getSqlStatement(int myId) throws IOException {
 
     char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
@@ -143,8 +145,9 @@
 
   private void initNetezzaExternalTableImport(int myId) throws IOException {
 
-    taskAttemptDir = TaskId.getLocalWorkPath(conf);
-
+    if (taskAttemptDir == null) {
+      taskAttemptDir = TaskId.getLocalWorkPath(conf);
+    }
     this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
     String filename = fifoFile.toString();
     NamedFifo nf;
@@ -163,7 +166,7 @@
     boolean cleanup = false;
     try {
       con = dbc.getConnection();
-      extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+      extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed,
         con, sqlStmt);
     } catch (SQLException sqle) {
       cleanup = true;
@@ -197,39 +200,38 @@
     conf = context.getConfiguration();
 
 
-    dbc = new DBConfiguration(conf);
+    if (dbc == null) { // need to be able to mock in tests
+      dbc = new DBConfiguration(conf);
+    }
     numMappers = ConfigurationHelper.getConfNumMaps(conf);
     char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
     initNetezzaExternalTableImport(dataSliceId);
     counter = new PerfCounters();
     counter.startClock();
     Text outputRecord = new Text();
-    if (extTableThread.isAlive()) {
-      try {
-        String inputRecord = recordReader.readLine();
-        while (inputRecord != null) {
-          if (Thread.interrupted()) {
-            if (!extTableThread.isAlive()) {
-              break;
-            }
-          }
-          outputRecord.set(inputRecord + rd);
-          // May be we should set the output to be String for faster performance
-          // There is no real benefit in changing it to Text and then
-          // converting it back in our case
-          writeRecord(outputRecord, context);
-          counter.addBytes(1 + inputRecord.length());
-          inputRecord = recordReader.readLine();
+    try {
+      String inputRecord = recordReader.readLine();
+      while (inputRecord != null) {
+        // Fail fast if there was an error during JDBC operation
+        if (jdbcFailed.get()) {
+          break;
         }
-      } finally {
-        recordReader.close();
-        extTableThread.join();
-        counter.stopClock();
-        LOG.info("Transferred " + counter.toString());
-        if (extTableThread.hasExceptions()) {
-          extTableThread.printException();
-          throw new IOException(extTableThread.getException());
-        }
+        outputRecord.set(inputRecord + rd);
+        // May be we should set the output to be String for faster performance
+        // There is no real benefit in changing it to Text and then
+        // converting it back in our case
+        writeRecord(outputRecord, context);
+        counter.addBytes(1 + inputRecord.length());
+        inputRecord = recordReader.readLine();
+      }
+    } finally {
+      recordReader.close();
+      extTableThread.join();
+      counter.stopClock();
+      LOG.info("Transferred " + counter.toString());
+      if (extTableThread.hasExceptions()) {
+        extTableThread.printException();
+        throw new IOException(extTableThread.getException());
       }
     }
   }
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
index cedfd23..a6a4481 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
@@ -21,6 +21,7 @@
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +39,7 @@
   private Connection con;
   private Exception exception;
   private PreparedStatement ps;
-  private Thread parent;
+  private AtomicBoolean failed;
 
   public boolean hasExceptions() {
     return exception != null;
@@ -58,9 +59,16 @@
     return exception;
   }
 
-  public NetezzaJDBCStatementRunner(Thread parent, Connection con,
-      String sqlStatement) throws SQLException {
-    this.parent = parent;
+  /**
+   * Execute Netezza SQL statement on given connection.
+   * @param failed Set this to true if the operation fails.
+   * @param con connection
+   * @param sqlStatement statement to execute
+   * @throws SQLException
+   */
+  public NetezzaJDBCStatementRunner(AtomicBoolean failed, Connection con,
+                                    String sqlStatement) throws SQLException {
+    this.failed = failed;
     this.con = con;
     this.ps = con.prepareStatement(sqlStatement);
     this.exception = null;
@@ -89,7 +97,7 @@
       con = null;
     }
     if (interruptParent) {
-      this.parent.interrupt();
+      failed.set(true);
     }
   }
 }
diff --git a/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java
new file mode 100644
index 0000000..5e55871
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Verifier;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestNetezzaExternalTableExportMapper {
+
+  // chained rule, see #rules
+  private Verifier verifyThatLogsAreUploaded = new Verifier() {
+    @Override public void verify() {
+      File jobDir = tmpFolder.getRoot().toPath().resolve("job_job001_0001").resolve("job__0000-0-0").toFile();
+      assertThat(jobDir.exists(), is(true));
+      assertThat(jobDir.listFiles().length, is(equalTo(1)));
+      assertThat(jobDir.listFiles()[0].getName(), is(equalTo("TEST.nzlog")));
+      try {
+        assertThat(FileUtils.readFileToString(jobDir.listFiles()[0]), is(equalTo("test log")));
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail("Failed to read log file.");
+      }
+    }
+  };
+
+  // chained rule, see #rules
+  private TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  // need to keep tmpFolder around to verify logs
+  @Rule
+  public RuleChain rules = RuleChain.outerRule(tmpFolder).around(verifyThatLogsAreUploaded);
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  private static final SQLException testException = new SQLException("failed in test");
+
+  private NetezzaExternalTableExportMapper<LongWritable, Text> mapper;
+  private Mapper.Context context;
+
+  @Before
+  public void setUp() throws Exception {
+    mapper = basicMockingOfMapper();
+    context = getContext();
+  }
+
+  @Test
+  public void testPassingJDBC() throws Exception {
+    withNoopJDBCOperation(mapper).run(context);
+  }
+
+  @Test
+  public void testFailingJDBC() throws Exception {
+    withFailingJDBCOperation(mapper);
+
+    exception.expect(IOException.class);
+    exception.expectCause(is(equalTo(testException)));
+    mapper.run(context);
+  }
+
+  /**
+   * Creates an instance of NetezzaExternalTableExportMapper with the
+   * necessary fields mocked to be able to call the run() method without errors.
+   * @return
+   */
+  private NetezzaExternalTableExportMapper<LongWritable, Text> basicMockingOfMapper() {
+    NetezzaExternalTableExportMapper<LongWritable, Text> mapper = new NetezzaExternalTableExportMapper<LongWritable, Text>() {
+      @Override
+      public void map(LongWritable key, Text text, Context context) {
+        // no-op. Don't read from context, mock won't be ready to handle that.
+      }
+    };
+
+    mapper.logDir = tmpFolder.getRoot().getAbsolutePath();
+
+    return mapper;
+  }
+
+  /**
+   * Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation.
+   * @param mapper will modify this object
+   * @return modified mapper
+   * @throws Exception
+   */
+  private NetezzaExternalTableExportMapper<LongWritable, Text> withFailingJDBCOperation(NetezzaExternalTableExportMapper<LongWritable, Text> mapper) throws Exception {
+    Connection connectionMock = mock(Connection.class);
+
+    // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+    PreparedStatement psMock = mock(PreparedStatement.class);
+    when(psMock.execute()).then(invocation -> {
+      // Write log file under taskAttemptDir to be able to check log upload
+      File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
+      FileUtils.writeStringToFile(logFile, "test log");
+
+      // Need to open FIFO for reading, otherwise writing would hang
+      FileInputStream fis = new FileInputStream(mapper.fifoFile.getAbsoluteFile());
+
+      // Simulate delay
+      Thread.sleep(200);
+      throw testException;
+    });
+    when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+    DBConfiguration dbcMock = mock(DBConfiguration.class);
+    when(dbcMock.getConnection()).thenReturn(connectionMock);
+    mapper.dbc = dbcMock;
+    return mapper;
+  }
+
+
+  /**
+   * Mocks mapper's DB connection to execute a no-op JDBC operation.
+   * @param mapper will modify this object
+   * @return modified mapper
+   * @throws Exception
+   */
+  private NetezzaExternalTableExportMapper<LongWritable, Text> withNoopJDBCOperation(NetezzaExternalTableExportMapper<LongWritable, Text> mapper) throws Exception {
+    Connection connectionMock = mock(Connection.class);
+
+    // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+    PreparedStatement psMock = mock(PreparedStatement.class);
+    when(psMock.execute()).then(invocation -> {
+      // Write log file under taskAttemptDir to be able to check log upload
+      File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
+      FileUtils.writeStringToFile(logFile, "test log");
+
+      // Need to open FIFO for reading, otherwise writing would hang
+      new FileInputStream(mapper.fifoFile.getAbsoluteFile());
+
+      // Simulate delay
+      Thread.sleep(200);
+      return true;
+    });
+    when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+    DBConfiguration dbcMock = mock(DBConfiguration.class);
+    when(dbcMock.getConnection()).thenReturn(connectionMock);
+    mapper.dbc = dbcMock;
+    return mapper;
+  }
+
+
+  /**
+   * Creates simple mapreduce context that says it has a single record but won't actually
+   * return any records as tests are not expected to read the records.
+   * @return
+   * @throws java.io.IOException
+   * @throws InterruptedException
+   */
+  private Mapper.Context getContext() throws java.io.IOException, InterruptedException {
+    Mapper.Context context = mock(Mapper.Context.class);
+
+    Configuration conf = new Configuration();
+    when(context.getConfiguration()).thenReturn(conf);
+
+    TaskAttemptID taskAttemptID = new TaskAttemptID();
+    when(context.getTaskAttemptID()).thenReturn(taskAttemptID);
+
+    JobID jobID = new JobID("job001", 1);
+    when(context.getJobID()).thenReturn(jobID);
+
+    // Simulate a single record by answering 'true' once
+    when(context.nextKeyValue()).thenAnswer(new Answer<Object>() {
+      boolean answer = true;
+
+      @Override
+      public Object answer(InvocationOnMock invocation) {
+        if (answer == true) {
+          answer = false;
+          return true;
+        }
+        return false;
+      }
+    });
+
+    return context;
+  }
+
+}
\ No newline at end of file
diff --git a/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java
new file mode 100644
index 0000000..1a69437
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestNetezzaExternalTableImportMapper {
+
+  @Rule
+  public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Rule
+  public final ExpectedException exception = ExpectedException.none();
+
+  private static final SQLException testException = new SQLException("failed in test");
+
+  private NetezzaExternalTableImportMapper<LongWritable, Text> mapper;
+  private Mapper.Context context;
+
+  @Before
+  public void setUp() {
+    mapper = basicMockingOfMapper();
+    context = getContext();
+  }
+
+  @Test
+  public void testPassingJDBC() throws Exception {
+    withNoopJDBCOperation(mapper).map(1, null, context);
+  }
+
+  @Test
+  public void testFailingJDBC() throws Exception {
+    withFailingJDBCOperation(mapper);
+
+    exception.expect(IOException.class);
+    exception.expectCause(is(equalTo(testException)));
+    mapper.map(1, null, context);
+  }
+
+  /**
+   * Creates an instance of NetezzaExternalTableExportMapper with the
+   * necessary fields mocked to be able to call the run() method without errors.
+   * @return
+   */
+  private NetezzaExternalTableImportMapper<LongWritable, Text> basicMockingOfMapper() {
+    return new NetezzaExternalTableImportMapper<LongWritable, Text>() {
+      @Override
+      protected void writeRecord(Text text, Context context) {
+        // no-op. Don't read from context, mock won't be ready to handle that.
+      }
+    };
+  }
+
+  /**
+   * Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation.
+   * @param mapper will modify this object
+   * @return modified mapper
+   * @throws Exception
+   */
+  private NetezzaExternalTableImportMapper<LongWritable, Text> withFailingJDBCOperation(NetezzaExternalTableImportMapper<LongWritable, Text> mapper) throws Exception {
+    Connection connectionMock = mock(Connection.class);
+
+    // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+    PreparedStatement psMock = mock(PreparedStatement.class);
+    when(psMock.execute()).then(invocation -> {
+      // Write log file under taskAttemptDir to be able to check log upload
+      File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
+      FileUtils.writeStringToFile(logFile, "test log");
+
+      // Need to open FIFO for writing, otherwise reading would hang
+      FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile());
+
+      // Simulate delay
+      Thread.sleep(200);
+
+      // Write single record, then throw
+      fos.write("test record".getBytes());
+      fos.close();
+      throw testException;
+    });
+    when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+    DBConfiguration dbcMock = mock(DBConfiguration.class);
+    when(dbcMock.getConnection()).thenReturn(connectionMock);
+    mapper.dbc = dbcMock;
+    return mapper;
+  }
+
+
+  /**
+   * Mocks mapper's DB connection to execute a no-op JDBC operation.
+   * @param mapper will modify this object
+   * @return modified mapper
+   * @throws Exception
+   */
+  private NetezzaExternalTableImportMapper<LongWritable, Text> withNoopJDBCOperation(NetezzaExternalTableImportMapper<LongWritable, Text> mapper) throws Exception {
+    Connection connectionMock = mock(Connection.class);
+
+    // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+    PreparedStatement psMock = mock(PreparedStatement.class);
+    when(psMock.execute()).then(invocation -> {
+      // Need to open FIFO for writing, otherwise reading would hang
+      FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile());
+
+      // Simulate delay
+      Thread.sleep(200);
+
+      // Write single record and return
+      fos.write("test record".getBytes());
+      fos.close();
+      return true;
+    });
+    when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+    DBConfiguration dbcMock = mock(DBConfiguration.class);
+    when(dbcMock.getConnection()).thenReturn(connectionMock);
+    mapper.dbc = dbcMock;
+    return mapper;
+  }
+
+
+  /**
+   * Creates simple mapreduce context that says it has a single record but won't actually
+   * return any records as tests are not expected to read the records.
+   * @return
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private Mapper.Context getContext() {
+    Mapper.Context context = mock(Mapper.Context.class);
+
+    Configuration conf = new Configuration();
+    when(context.getConfiguration()).thenReturn(conf);
+
+    TaskAttemptID taskAttemptID = new TaskAttemptID();
+    when(context.getTaskAttemptID()).thenReturn(taskAttemptID);
+
+    return context;
+  }
+
+}
\ No newline at end of file