TAJO-2143: Fix race condition in task history writer.
diff --git a/CHANGES b/CHANGES
index 226a3ab..009cf13 100644
--- a/CHANGES
+++ b/CHANGES
@@ -12,6 +12,8 @@
BUG FIXES
+ TAJO-2143: Fix race condition in task history writer. (jinho)
+
TAJO-2140: TajoInternalError does not contains reason stack trace. (jinho)
TAJO-2135: Invalid join result when join key columns contain nulls. (jihoon)
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index 3d2578c..448f1f1 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -283,7 +284,10 @@
TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats);
- writer.appendAndSync(taskHistory2);
+ writer.appendHistory(taskHistory2);
+
+ HistoryWriter.WriterFuture future = writer.flushTaskHistories();
+ future.get(10, TimeUnit.SECONDS);
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
String startDate = df.format(new Date(startTime));
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index 5fca7a7..0d1d7f2 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -175,16 +175,30 @@
}
}
- /* Flushing the buffer */
- public void flushTaskHistories() {
- if (historyQueue.size() > 0) {
- synchronized (writerThread) {
- writerThread.needTaskFlush.set(true);
- writerThread.notifyAll();
+ /**
+ * flush all task histories
+ */
+ public WriterFuture flushTaskHistories() {
+
+ WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(null) {
+ public void done(WriterHolder holder) {
+ for(WriterHolder writerHolder : taskWriters.values()) {
+ try {
+ writerHolder.flush();
+ } catch (IOException e) {
+ super.failed(e);
+ }
+
+ }
+ super.done(null);
}
- } else {
- writerThread.flushTaskHistories();
+ };
+
+ historyQueue.add(future);
+ synchronized (writerThread) {
+ writerThread.notifyAll();
}
+ return future;
}
public static FileSystem getNonCrcFileSystem(Path path, Configuration conf) throws IOException {
@@ -225,7 +239,6 @@
}
class WriterThread extends Thread {
- private AtomicBoolean needTaskFlush = new AtomicBoolean(false);
public void run() {
LOG.info("HistoryWriter_" + processName + " started.");
@@ -303,7 +316,11 @@
for (WriterFuture<WriterHolder> future : histories) {
History history = future.getHistory();
- switch (history.getHistoryType()) {
+
+ if(history == null) {
+ future.done(null);
+ } else {
+ switch (history.getHistoryType()) {
case TASK:
try {
future.done(writeTaskHistory((TaskHistory) history));
@@ -334,12 +351,10 @@
break;
default:
LOG.warn("Wrong history type: " + history.getHistoryType());
+ }
}
}
- if(needTaskFlush.getAndSet(false)){
- flushTaskHistories();
- }
return histories;
}