SQOOP-3378: Error during direct Netezza import/export can interrupt process in uncontrolled ways
(Daniel Voros via Szabolcs Vasas)
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
index 5bf2188..6dbb98d 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableExportMapper.java
@@ -18,26 +18,14 @@
package org.apache.sqoop.mapreduce.db.netezza;
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.io.IOUtils;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.io.NamedFifo;
+import org.apache.sqoop.lib.DelimiterSet;
import org.apache.sqoop.lib.SqoopRecord;
import org.apache.sqoop.manager.DirectNetezzaManager;
import org.apache.sqoop.mapreduce.SqoopMapper;
@@ -46,7 +34,14 @@
import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.util.TaskId;
-import org.apache.sqoop.lib.DelimiterSet;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* Netezza export mapper using external tables.
@@ -59,8 +54,10 @@
*/
private Configuration conf;
- private DBConfiguration dbc;
- private File fifoFile;
+ @VisibleForTesting
+ DBConfiguration dbc;
+ @VisibleForTesting
+ File fifoFile;
private Connection con;
private OutputStream recordWriter;
public static final Log LOG = LogFactory
@@ -69,8 +66,12 @@
private PerfCounters counter;
private DelimiterSet outputDelimiters;
private String localLogDir = null;
- private String logDir = null;
- private File taskAttemptDir = null;
+ @VisibleForTesting
+ String logDir = null;
+ @VisibleForTesting
+ File taskAttemptDir = null;
+
+ private AtomicBoolean jdbcFailed = new AtomicBoolean(false);
private String getSqlStatement(DelimiterSet delimiters) throws IOException {
@@ -168,9 +169,13 @@
taskAttemptDir = TaskId.getLocalWorkPath(conf);
localLogDir =
DirectNetezzaManager.getLocalLogDir(context.getTaskAttemptID());
- logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+ if (logDir == null) { // need to be able to set in tests
+ logDir = conf.get(DirectNetezzaManager.NETEZZA_LOG_DIR_OPT);
+ }
- dbc = new DBConfiguration(conf);
+ if (dbc == null) { // need to be able to mock in tests
+ dbc = new DBConfiguration(conf);
+ }
File taskAttemptDir = TaskId.getLocalWorkPath(conf);
char fd = (char) conf.getInt(DelimiterSet.INPUT_FIELD_DELIM_KEY, ',');
@@ -196,7 +201,7 @@
boolean cleanup = false;
try {
con = dbc.getConnection();
- extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+ extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed,
con, sqlStmt);
} catch (SQLException sqle) {
cleanup = true;
@@ -226,49 +231,43 @@
public void run(Context context) throws IOException, InterruptedException {
setup(context);
initNetezzaExternalTableExport(context);
- if (extTableThread.isAlive()) {
- try {
- while (context.nextKeyValue()) {
- if (Thread.interrupted()) {
- if (!extTableThread.isAlive()) {
- break;
- }
- }
- map(context.getCurrentKey(), context.getCurrentValue(), context);
+ try {
+ while (context.nextKeyValue()) {
+ // Fail fast if there was an error during JDBC operation
+ if (jdbcFailed.get()) {
+ break;
}
- cleanup(context);
- } finally {
- try {
- recordWriter.close();
- extTableThread.join();
- } catch (Exception e) {
- LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
- }
- counter.stopClock();
- LOG.info("Transferred " + counter.toString());
- FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
- localLogDir, logDir, context.getJobID().toString(),
- conf);
-
- if (extTableThread.hasExceptions()) {
- extTableThread.printException();
- throw new IOException(extTableThread.getException());
- }
+ map(context.getCurrentKey(), context.getCurrentValue(), context);
}
+ cleanup(context);
+ } finally {
+ try {
+ recordWriter.close();
+ extTableThread.join();
+ } catch (Exception e) {
+ LOG.debug("Exception cleaning up mapper operation : " + e.getMessage());
+ }
+ counter.stopClock();
+ LOG.info("Transferred " + counter.toString());
+ FileUploader.uploadFilesToDFS(taskAttemptDir.getAbsolutePath(),
+ localLogDir, logDir, context.getJobID().toString(),
+ conf);
+ if (extTableThread.hasExceptions()) {
+ extTableThread.printException();
+ throw new IOException(extTableThread.getException());
+ }
}
}
- protected void writeTextRecord(Text record) throws IOException,
- InterruptedException {
+ protected void writeTextRecord(Text record) throws IOException {
String outputStr = record.toString() + "\n";
byte[] outputBytes = outputStr.getBytes("UTF-8");
counter.addBytes(outputBytes.length);
recordWriter.write(outputBytes, 0, outputBytes.length);
}
- protected void writeSqoopRecord(SqoopRecord sqr) throws IOException,
- InterruptedException {
+ protected void writeSqoopRecord(SqoopRecord sqr) throws IOException {
String outputStr = sqr.toString(this.outputDelimiters);
byte[] outputBytes = outputStr.getBytes("UTF-8");
counter.addBytes(outputBytes.length);
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
index 306062a..3124b17 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaExternalTableImportMapper.java
@@ -18,22 +18,10 @@
package org.apache.sqoop.mapreduce.db.netezza;
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.sql.Connection;
-import java.sql.SQLException;
-
-import org.apache.commons.io.IOUtils;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.sqoop.config.ConfigurationHelper;
@@ -42,10 +30,18 @@
import org.apache.sqoop.manager.DirectNetezzaManager;
import org.apache.sqoop.mapreduce.AutoProgressMapper;
import org.apache.sqoop.mapreduce.db.DBConfiguration;
-import org.apache.sqoop.util.FileUploader;
import org.apache.sqoop.util.PerfCounters;
import org.apache.sqoop.util.TaskId;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* Netezza import mapper using external tables.
*/
@@ -57,8 +53,10 @@
*/
private Configuration conf;
- private DBConfiguration dbc;
- private File fifoFile;
+ @VisibleForTesting
+ DBConfiguration dbc;
+ @VisibleForTesting
+ File fifoFile;
private int numMappers;
private Connection con;
private BufferedReader recordReader;
@@ -66,7 +64,11 @@
.getLog(NetezzaExternalTableImportMapper.class.getName());
private NetezzaJDBCStatementRunner extTableThread;
private PerfCounters counter;
- private File taskAttemptDir = null;
+ @VisibleForTesting
+ File taskAttemptDir = null;
+
+ private AtomicBoolean jdbcFailed = new AtomicBoolean(false);
+
private String getSqlStatement(int myId) throws IOException {
char fd = (char) conf.getInt(DelimiterSet.OUTPUT_FIELD_DELIM_KEY, ',');
@@ -143,8 +145,9 @@
private void initNetezzaExternalTableImport(int myId) throws IOException {
- taskAttemptDir = TaskId.getLocalWorkPath(conf);
-
+ if (taskAttemptDir == null) {
+ taskAttemptDir = TaskId.getLocalWorkPath(conf);
+ }
this.fifoFile = new File(taskAttemptDir, ("nzexttable-" + myId + ".txt"));
String filename = fifoFile.toString();
NamedFifo nf;
@@ -163,7 +166,7 @@
boolean cleanup = false;
try {
con = dbc.getConnection();
- extTableThread = new NetezzaJDBCStatementRunner(Thread.currentThread(),
+ extTableThread = new NetezzaJDBCStatementRunner(jdbcFailed,
con, sqlStmt);
} catch (SQLException sqle) {
cleanup = true;
@@ -197,39 +200,38 @@
conf = context.getConfiguration();
- dbc = new DBConfiguration(conf);
+ if (dbc == null) { // need to be able to mock in tests
+ dbc = new DBConfiguration(conf);
+ }
numMappers = ConfigurationHelper.getConfNumMaps(conf);
char rd = (char) conf.getInt(DelimiterSet.OUTPUT_RECORD_DELIM_KEY, '\n');
initNetezzaExternalTableImport(dataSliceId);
counter = new PerfCounters();
counter.startClock();
Text outputRecord = new Text();
- if (extTableThread.isAlive()) {
- try {
- String inputRecord = recordReader.readLine();
- while (inputRecord != null) {
- if (Thread.interrupted()) {
- if (!extTableThread.isAlive()) {
- break;
- }
- }
- outputRecord.set(inputRecord + rd);
- // May be we should set the output to be String for faster performance
- // There is no real benefit in changing it to Text and then
- // converting it back in our case
- writeRecord(outputRecord, context);
- counter.addBytes(1 + inputRecord.length());
- inputRecord = recordReader.readLine();
+ try {
+ String inputRecord = recordReader.readLine();
+ while (inputRecord != null) {
+ // Fail fast if there was an error during JDBC operation
+ if (jdbcFailed.get()) {
+ break;
}
- } finally {
- recordReader.close();
- extTableThread.join();
- counter.stopClock();
- LOG.info("Transferred " + counter.toString());
- if (extTableThread.hasExceptions()) {
- extTableThread.printException();
- throw new IOException(extTableThread.getException());
- }
+ outputRecord.set(inputRecord + rd);
+ // May be we should set the output to be String for faster performance
+ // There is no real benefit in changing it to Text and then
+ // converting it back in our case
+ writeRecord(outputRecord, context);
+ counter.addBytes(1 + inputRecord.length());
+ inputRecord = recordReader.readLine();
+ }
+ } finally {
+ recordReader.close();
+ extTableThread.join();
+ counter.stopClock();
+ LOG.info("Transferred " + counter.toString());
+ if (extTableThread.hasExceptions()) {
+ extTableThread.printException();
+ throw new IOException(extTableThread.getException());
}
}
}
diff --git a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
index cedfd23..a6a4481 100644
--- a/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
+++ b/src/java/org/apache/sqoop/mapreduce/db/netezza/NetezzaJDBCStatementRunner.java
@@ -21,6 +21,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -38,7 +39,7 @@
private Connection con;
private Exception exception;
private PreparedStatement ps;
- private Thread parent;
+ private AtomicBoolean failed;
public boolean hasExceptions() {
return exception != null;
@@ -58,9 +59,16 @@
return exception;
}
- public NetezzaJDBCStatementRunner(Thread parent, Connection con,
- String sqlStatement) throws SQLException {
- this.parent = parent;
+ /**
+ * Execute Netezza SQL statement on given connection.
+ * @param failed Set this to true if the operation fails.
+ * @param con connection
+ * @param sqlStatement statement to execute
+ * @throws SQLException
+ */
+ public NetezzaJDBCStatementRunner(AtomicBoolean failed, Connection con,
+ String sqlStatement) throws SQLException {
+ this.failed = failed;
this.con = con;
this.ps = con.prepareStatement(sqlStatement);
this.exception = null;
@@ -89,7 +97,7 @@
con = null;
}
if (interruptParent) {
- this.parent.interrupt();
+ failed.set(true);
}
}
}
diff --git a/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java
new file mode 100644
index 0000000..5e55871
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableExportMapper.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.RuleChain;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.Verifier;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestNetezzaExternalTableExportMapper {
+
+ // chained rule, see #rules
+ private Verifier verifyThatLogsAreUploaded = new Verifier() {
+ @Override public void verify() {
+ File jobDir = tmpFolder.getRoot().toPath().resolve("job_job001_0001").resolve("job__0000-0-0").toFile();
+ assertThat(jobDir.exists(), is(true));
+ assertThat(jobDir.listFiles().length, is(equalTo(1)));
+ assertThat(jobDir.listFiles()[0].getName(), is(equalTo("TEST.nzlog")));
+ try {
+ assertThat(FileUtils.readFileToString(jobDir.listFiles()[0]), is(equalTo("test log")));
+ } catch (IOException e) {
+ e.printStackTrace();
+ fail("Failed to read log file.");
+ }
+ }
+ };
+
+ // chained rule, see #rules
+ private TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ // need to keep tmpFolder around to verify logs
+ @Rule
+ public RuleChain rules = RuleChain.outerRule(tmpFolder).around(verifyThatLogsAreUploaded);
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ private static final SQLException testException = new SQLException("failed in test");
+
+ private NetezzaExternalTableExportMapper<LongWritable, Text> mapper;
+ private Mapper.Context context;
+
+ @Before
+ public void setUp() throws Exception {
+ mapper = basicMockingOfMapper();
+ context = getContext();
+ }
+
+ @Test
+ public void testPassingJDBC() throws Exception {
+ withNoopJDBCOperation(mapper).run(context);
+ }
+
+ @Test
+ public void testFailingJDBC() throws Exception {
+ withFailingJDBCOperation(mapper);
+
+ exception.expect(IOException.class);
+ exception.expectCause(is(equalTo(testException)));
+ mapper.run(context);
+ }
+
+ /**
+ * Creates an instance of NetezzaExternalTableExportMapper with the
+ * necessary fields mocked to be able to call the run() method without errors.
+ * @return
+ */
+ private NetezzaExternalTableExportMapper<LongWritable, Text> basicMockingOfMapper() {
+ NetezzaExternalTableExportMapper<LongWritable, Text> mapper = new NetezzaExternalTableExportMapper<LongWritable, Text>() {
+ @Override
+ public void map(LongWritable key, Text text, Context context) {
+ // no-op. Don't read from context, mock won't be ready to handle that.
+ }
+ };
+
+ mapper.logDir = tmpFolder.getRoot().getAbsolutePath();
+
+ return mapper;
+ }
+
+ /**
+ * Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation.
+ * @param mapper will modify this object
+ * @return modified mapper
+ * @throws Exception
+ */
+ private NetezzaExternalTableExportMapper<LongWritable, Text> withFailingJDBCOperation(NetezzaExternalTableExportMapper<LongWritable, Text> mapper) throws Exception {
+ Connection connectionMock = mock(Connection.class);
+
+ // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+ PreparedStatement psMock = mock(PreparedStatement.class);
+ when(psMock.execute()).then(invocation -> {
+ // Write log file under taskAttemptDir to be able to check log upload
+ File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
+ FileUtils.writeStringToFile(logFile, "test log");
+
+ // Need to open FIFO for reading, otherwise writing would hang
+ FileInputStream fis = new FileInputStream(mapper.fifoFile.getAbsoluteFile());
+
+ // Simulate delay
+ Thread.sleep(200);
+ throw testException;
+ });
+ when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+ DBConfiguration dbcMock = mock(DBConfiguration.class);
+ when(dbcMock.getConnection()).thenReturn(connectionMock);
+ mapper.dbc = dbcMock;
+ return mapper;
+ }
+
+
+ /**
+ * Mocks mapper's DB connection to execute a no-op JDBC operation.
+ * @param mapper will modify this object
+ * @return modified mapper
+ * @throws Exception
+ */
+ private NetezzaExternalTableExportMapper<LongWritable, Text> withNoopJDBCOperation(NetezzaExternalTableExportMapper<LongWritable, Text> mapper) throws Exception {
+ Connection connectionMock = mock(Connection.class);
+
+ // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+ PreparedStatement psMock = mock(PreparedStatement.class);
+ when(psMock.execute()).then(invocation -> {
+ // Write log file under taskAttemptDir to be able to check log upload
+ File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
+ FileUtils.writeStringToFile(logFile, "test log");
+
+ // Need to open FIFO for reading, otherwise writing would hang
+ new FileInputStream(mapper.fifoFile.getAbsoluteFile());
+
+ // Simulate delay
+ Thread.sleep(200);
+ return true;
+ });
+ when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+ DBConfiguration dbcMock = mock(DBConfiguration.class);
+ when(dbcMock.getConnection()).thenReturn(connectionMock);
+ mapper.dbc = dbcMock;
+ return mapper;
+ }
+
+
+ /**
+ * Creates simple mapreduce context that says it has a single record but won't actually
+ * return any records as tests are not expected to read the records.
+ * @return
+ * @throws java.io.IOException
+ * @throws InterruptedException
+ */
+ private Mapper.Context getContext() throws java.io.IOException, InterruptedException {
+ Mapper.Context context = mock(Mapper.Context.class);
+
+ Configuration conf = new Configuration();
+ when(context.getConfiguration()).thenReturn(conf);
+
+ TaskAttemptID taskAttemptID = new TaskAttemptID();
+ when(context.getTaskAttemptID()).thenReturn(taskAttemptID);
+
+ JobID jobID = new JobID("job001", 1);
+ when(context.getJobID()).thenReturn(jobID);
+
+ // Simulate a single record by answering 'true' once
+ when(context.nextKeyValue()).thenAnswer(new Answer<Object>() {
+ boolean answer = true;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ if (answer == true) {
+ answer = false;
+ return true;
+ }
+ return false;
+ }
+ });
+
+ return context;
+ }
+
+}
\ No newline at end of file
diff --git a/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java
new file mode 100644
index 0000000..1a69437
--- /dev/null
+++ b/src/test/org/apache/sqoop/mapreduce/db/netezza/TestNetezzaExternalTableImportMapper.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.sqoop.mapreduce.db.netezza;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.sqoop.mapreduce.db.DBConfiguration;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.is;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestNetezzaExternalTableImportMapper {
+
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Rule
+ public final ExpectedException exception = ExpectedException.none();
+
+ private static final SQLException testException = new SQLException("failed in test");
+
+ private NetezzaExternalTableImportMapper<LongWritable, Text> mapper;
+ private Mapper.Context context;
+
+ @Before
+ public void setUp() {
+ mapper = basicMockingOfMapper();
+ context = getContext();
+ }
+
+ @Test
+ public void testPassingJDBC() throws Exception {
+ withNoopJDBCOperation(mapper).map(1, null, context);
+ }
+
+ @Test
+ public void testFailingJDBC() throws Exception {
+ withFailingJDBCOperation(mapper);
+
+ exception.expect(IOException.class);
+ exception.expectCause(is(equalTo(testException)));
+ mapper.map(1, null, context);
+ }
+
+ /**
+ * Creates an instance of NetezzaExternalTableExportMapper with the
+ * necessary fields mocked to be able to call the run() method without errors.
+ * @return
+ */
+ private NetezzaExternalTableImportMapper<LongWritable, Text> basicMockingOfMapper() {
+ return new NetezzaExternalTableImportMapper<LongWritable, Text>() {
+ @Override
+ protected void writeRecord(Text text, Context context) {
+ // no-op. Don't read from context, mock won't be ready to handle that.
+ }
+ };
+ }
+
+ /**
+ * Mocks mapper's DB connection in a way that leads to SQLException during the JDBC operation.
+ * @param mapper will modify this object
+ * @return modified mapper
+ * @throws Exception
+ */
+ private NetezzaExternalTableImportMapper<LongWritable, Text> withFailingJDBCOperation(NetezzaExternalTableImportMapper<LongWritable, Text> mapper) throws Exception {
+ Connection connectionMock = mock(Connection.class);
+
+ // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+ PreparedStatement psMock = mock(PreparedStatement.class);
+ when(psMock.execute()).then(invocation -> {
+ // Write log file under taskAttemptDir to be able to check log upload
+ File logFile = mapper.taskAttemptDir.toPath().resolve("job__0000-0-0").resolve("TEST.nzlog").toFile();
+ FileUtils.writeStringToFile(logFile, "test log");
+
+ // Need to open FIFO for writing, otherwise reading would hang
+ FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile());
+
+ // Simulate delay
+ Thread.sleep(200);
+
+ // Write single record, then throw
+ fos.write("test record".getBytes());
+ fos.close();
+ throw testException;
+ });
+ when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+ DBConfiguration dbcMock = mock(DBConfiguration.class);
+ when(dbcMock.getConnection()).thenReturn(connectionMock);
+ mapper.dbc = dbcMock;
+ return mapper;
+ }
+
+
+ /**
+ * Mocks mapper's DB connection to execute a no-op JDBC operation.
+ * @param mapper will modify this object
+ * @return modified mapper
+ * @throws Exception
+ */
+ private NetezzaExternalTableImportMapper<LongWritable, Text> withNoopJDBCOperation(NetezzaExternalTableImportMapper<LongWritable, Text> mapper) throws Exception {
+ Connection connectionMock = mock(Connection.class);
+
+ // PreparadStatement mock should imitate loading stuff from FIFO into Netezza
+ PreparedStatement psMock = mock(PreparedStatement.class);
+ when(psMock.execute()).then(invocation -> {
+ // Need to open FIFO for writing, otherwise reading would hang
+ FileOutputStream fos = new FileOutputStream(mapper.fifoFile.getAbsoluteFile());
+
+ // Simulate delay
+ Thread.sleep(200);
+
+ // Write single record and return
+ fos.write("test record".getBytes());
+ fos.close();
+ return true;
+ });
+ when(connectionMock.prepareStatement(anyString())).thenReturn(psMock);
+
+ DBConfiguration dbcMock = mock(DBConfiguration.class);
+ when(dbcMock.getConnection()).thenReturn(connectionMock);
+ mapper.dbc = dbcMock;
+ return mapper;
+ }
+
+
+ /**
+ * Creates simple mapreduce context that says it has a single record but won't actually
+ * return any records as tests are not expected to read the records.
+ * @return
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ private Mapper.Context getContext() {
+ Mapper.Context context = mock(Mapper.Context.class);
+
+ Configuration conf = new Configuration();
+ when(context.getConfiguration()).thenReturn(conf);
+
+ TaskAttemptID taskAttemptID = new TaskAttemptID();
+ when(context.getTaskAttemptID()).thenReturn(taskAttemptID);
+
+ return context;
+ }
+
+}
\ No newline at end of file