[CARBONDATA-3958] Avoid blocking the loading task when the output queue poll timeout

Why is this PR needed?
In some cases, the CDC merge tasks are blocked when data loading uses
CarbonTableOutputFormat.getRecordWriter method. PR #3856 change the code to avoid using
CarbonTableOutputFormat.getRecordWriter, CDC merge will not happen this issue again.
But this issue maybe happen in other similar scenarios.

Because the poll method of the queue is time out in some cases, so the row batch in
the queue will not be polled again.

After the queue is full, the queue blocks the writing task puts a new batch. Even if
the queue is not full, it will also lose the row batch in the queue.

What changes were proposed in this PR?
If the output is not closed, it will poll a row batch in the loops till it gets a not
null batch.
If the output is closed, it will break the loop.

Does this PR introduce any user interface change?
No

Is any new testcase added?
No

This closes #3897
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
index 3e29288..dc9aa54 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/iterator/CarbonOutputIteratorWrapper.java
@@ -63,18 +63,33 @@
   @Override
   public boolean hasNext() {
     if (readBatch == null || !readBatch.hasNext()) {
+      // if readBatch don't have next row, set it to null
+      readBatch = null;
       try {
-        if (!close) {
+        // if the output is not closed, poll a batch from the queue in a loop.
+        // if the queue is always empty, it will wait for the last default element of the output,
+        // after that, the output will be closed and the loop will be finished.
+        while (!close) {
           readBatch = queue.poll(5, TimeUnit.MINUTES);
           if (readBatch == null) {
-            LOG.warn("This scenario should not happen");
-            return false;
+            LOG.warn("try to poll a row batch again.");
+          } else {
+            // if readBatch is not null, break this loop to continue
+            break;
           }
-        } else {
+        }
+
+        // when the output is closed and readBatch is null, should poll a batch immediately.
+        // it is a double-checking also of the last poll operation. in some cases, the output is
+        // closed and the readBatch is null, but the queue is not empty, contain the last load
+        // batch or the last default batch.
+        if (close && readBatch == null) {
+          LOG.warn("try to poll a row batch one more time.");
           readBatch = queue.poll();
-          if (readBatch == null) {
-            return false;
-          }
+        }
+        // checking again whether readBatch is null or not
+        if (readBatch == null) {
+          return false;
         }
       } catch (InterruptedException e) {
         throw new RuntimeException(e);
@@ -112,13 +127,14 @@
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    close = true;
     // It is required if the thread waits for take.
     if (queue.isEmpty()) {
       if (!queue.offer(new RowBatch(0))) {
         LOG.warn("The default last element is not added to queue");
       }
     }
+    // after try to add the default last element, close the output.
+    close = true;
   }
 
   private static class RowBatch extends CarbonIterator<Object[]> {