Fix data race in getting results from MSQ select tasks. (#16107)

* Fix data race in getting results from MSQ select tasks.

* Add better logging

* Handling number overflow.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index d62bcce..5da74f0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -462,9 +462,27 @@
         log.warn("Worker: %s", MSQTasks.errorReportToLogMessage(workerError));
       }
     }
-
+    MSQResultsReport resultsReport = null;
     if (queryKernel != null && queryKernel.isSuccess()) {
       // If successful, encourage the tasks to exit successfully.
+      // get results before posting finish to the tasks.
+      if (resultsYielder != null) {
+        resultsReport = makeResultsTaskReport(
+            queryDef,
+            resultsYielder,
+            task.getQuerySpec().getColumnMappings(),
+            task.getSqlTypeNames(),
+            MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
+        );
+        try {
+          resultsYielder.close();
+        }
+        catch (IOException e) {
+          throw new RuntimeException("Unable to fetch results of various worker tasks successfully", e);
+        }
+      } else {
+        resultsReport = null;
+      }
       postFinishToAllTasks();
       workerTaskLauncher.stop(false);
     } else {
@@ -509,7 +527,6 @@
     try {
       // Write report even if something went wrong.
       final MSQStagesReport stagesReport;
-      final MSQResultsReport resultsReport;
 
       if (queryDef != null) {
         final Map<Integer, ControllerStagePhase> stagePhaseMap;
@@ -538,18 +555,6 @@
         stagesReport = null;
       }
 
-      if (resultsYielder != null) {
-        resultsReport = makeResultsTaskReport(
-            queryDef,
-            resultsYielder,
-            task.getQuerySpec().getColumnMappings(),
-            task.getSqlTypeNames(),
-            MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
-        );
-      } else {
-        resultsReport = null;
-      }
-
       final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload(
           makeStatusReport(
               taskStateForReport,
@@ -564,7 +569,6 @@
           countersSnapshot,
           resultsReport
       );
-
       context.writeReports(
           id(),
           TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload))
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
index db46b42..b96ce46 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
@@ -28,8 +28,10 @@
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.java.util.common.guava.Yielder;
 import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.msq.exec.Limits;
 import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.segment.column.ColumnType;
 
 import javax.annotation.Nullable;
@@ -39,6 +41,7 @@
 
 public class MSQResultsReport
 {
+  private static final Logger log = new Logger(MSQResultsReport.class);
   /**
    * Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility
    * with SQL (which also allows duplicate column names in query results).
@@ -83,18 +86,35 @@
       MSQSelectDestination selectDestination
   )
   {
-    if (selectDestination.shouldTruncateResultsInTaskReport()) {
-      List<Object[]> results = new ArrayList<>();
-      int rowCount = 0;
-      while (!resultYielder.isDone() && rowCount < Limits.MAX_SELECT_RESULT_ROWS) {
-        results.add(resultYielder.get());
-        resultYielder = resultYielder.next(null);
-        ++rowCount;
+    List<Object[]> results = new ArrayList<>();
+    long rowCount = 0;
+    int factor = 1;
+    while (!resultYielder.isDone()) {
+      results.add(resultYielder.get());
+      resultYielder = resultYielder.next(null);
+      ++rowCount;
+      if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) {
+        break;
       }
-      return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results)), !resultYielder.isDone());
-    } else {
-      return new MSQResultsReport(signature, sqlTypeNames, resultYielder, false);
+      if (rowCount % (factor * Limits.MAX_SELECT_RESULT_ROWS) == 0) {
+        log.warn(
+            "Task report is getting too large with %d rows. Large task reports can cause the controller to go out of memory. "
+            + "Consider using the 'limit %d' clause in your query to reduce the number of rows in the result. "
+            + "If you require all the results, consider setting [%s=%s] in the query context which will allow you to fetch large result sets.",
+            rowCount,
+            Limits.MAX_SELECT_RESULT_ROWS,
+            MultiStageQueryContext.CTX_SELECT_DESTINATION,
+            MSQSelectDestination.DURABLESTORAGE.getName()
+        );
+        factor = factor < 32 ? factor * 2 : 32;
+      }
     }
+    return new MSQResultsReport(
+        signature,
+        sqlTypeNames,
+        Yielders.each(Sequences.simple(results)),
+        !resultYielder.isDone()
+    );
   }
 
   @JsonProperty("signature")