SQOOP-2343: AsyncSqlRecordWriter stucks if any exception is thrown out in its close method
(Yibing Shi via Jarek Jarcec Cecho)
diff --git a/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java b/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java
index d0e1711..15a62a6 100644
--- a/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java
+++ b/src/java/org/apache/sqoop/mapreduce/AsyncSqlRecordWriter.java
@@ -61,6 +61,8 @@
private AsyncSqlOutputFormat.AsyncSqlExecThread execThread;
private boolean startedExecThread;
+ private boolean closed;
+
public AsyncSqlRecordWriter(TaskAttemptContext context)
throws ClassNotFoundException, SQLException {
this.conf = context.getConfiguration();
@@ -82,6 +84,8 @@
connection, stmtsPerTx);
this.execThread.setDaemon(true);
this.startedExecThread = false;
+
+ this.closed = false;
}
/**
@@ -176,6 +180,15 @@
/** {@inheritDoc} */
public void close(TaskAttemptContext context)
throws IOException, InterruptedException {
+ // If any exception is thrown out in this method, mapreduce framework catches the exception and
+ // calls this method again in case the recorder hasn't bee closed properly. Without the
+ // protection below, it can make the main thread stuck in execThread.put since there is no
+ // receiver for the synchronous queue any more.
+ if (closed) {
+ return;
+ }
+ closed = true;
+
try {
try {
execUpdate(true, true);