TAJO-2143: Fix race condition in task history writer.

Closes #1013
diff --git a/CHANGES b/CHANGES
index ad9ca05..c653d99 100644
--- a/CHANGES
+++ b/CHANGES
@@ -142,6 +142,8 @@
 
   BUG FIXES
 
+    TAJO-2143: Fix race condition in task history writer. (jinho)
+
     TAJO-2140: TajoInternalError does not contains reason stack trace. (jinho)
 
     TAJO-2135: Invalid join result when join key columns contain nulls. (jihoon)
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index 8b33d02..132def7 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -39,6 +39,7 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 
@@ -283,7 +284,10 @@
       TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
       org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
           id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats);
-      writer.appendAndSync(taskHistory2);
+      writer.appendHistory(taskHistory2);
+
+      HistoryWriter.WriterFuture future = writer.flushTaskHistories();
+      future.get(10, TimeUnit.SECONDS);
 
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
       String startDate = df.format(new Date(startTime));
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index bc62cac..668940c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -175,16 +175,30 @@
     }
   }
 
-  /* Flushing the buffer */
-  public void flushTaskHistories() {
-    if (historyQueue.size() > 0) {
-      synchronized (writerThread) {
-        writerThread.needTaskFlush.set(true);
-        writerThread.notifyAll();
+  /**
+   *  flush all task histories
+   */
+  public WriterFuture flushTaskHistories() {
+
+    WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(null) {
+      public void done(WriterHolder holder) {
+        for(WriterHolder writerHolder : taskWriters.values()) {
+          try {
+            writerHolder.flush();
+          } catch (IOException e) {
+            super.failed(e);
+          }
+
+        }
+        super.done(null);
       }
-    } else {
-      writerThread.flushTaskHistories();
+    };
+
+    historyQueue.add(future);
+    synchronized (writerThread) {
+      writerThread.notifyAll();
     }
+    return future;
   }
 
   public static FileSystem getNonCrcFileSystem(Path path, Configuration conf) throws IOException {
@@ -225,7 +239,6 @@
   }
 
   class WriterThread extends Thread {
-    private AtomicBoolean needTaskFlush = new AtomicBoolean(false);
 
     public void run() {
       LOG.info("HistoryWriter_" + processName + " started.");
@@ -303,7 +316,11 @@
 
       for (WriterFuture<WriterHolder> future : histories) {
         History history = future.getHistory();
-        switch (history.getHistoryType()) {
+
+        if(history == null) {
+          future.done(null);
+        } else {
+          switch (history.getHistoryType()) {
           case TASK:
             try {
               future.done(writeTaskHistory((TaskHistory) history));
@@ -334,12 +351,10 @@
             break;
           default:
             LOG.warn("Wrong history type: " + history.getHistoryType());
+          }
         }
       }
 
-      if(needTaskFlush.getAndSet(false)){
-        flushTaskHistories();
-      }
       return histories;
     }