Fix data race in getting results from MSQ select tasks. (#16107)
* Fix data race in getting results from MSQ select tasks.
* Add better logging
* Handling number overflow.
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index d62bcce..5da74f0 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -462,9 +462,27 @@
log.warn("Worker: %s", MSQTasks.errorReportToLogMessage(workerError));
}
}
-
+ MSQResultsReport resultsReport = null;
if (queryKernel != null && queryKernel.isSuccess()) {
// If successful, encourage the tasks to exit successfully.
+ // get results before posting finish to the tasks.
+ if (resultsYielder != null) {
+ resultsReport = makeResultsTaskReport(
+ queryDef,
+ resultsYielder,
+ task.getQuerySpec().getColumnMappings(),
+ task.getSqlTypeNames(),
+ MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
+ );
+ try {
+ resultsYielder.close();
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Unable to fetch results of various worker tasks successfully", e);
+ }
+ } else {
+ resultsReport = null;
+ }
postFinishToAllTasks();
workerTaskLauncher.stop(false);
} else {
@@ -509,7 +527,6 @@
try {
// Write report even if something went wrong.
final MSQStagesReport stagesReport;
- final MSQResultsReport resultsReport;
if (queryDef != null) {
final Map<Integer, ControllerStagePhase> stagePhaseMap;
@@ -538,18 +555,6 @@
stagesReport = null;
}
- if (resultsYielder != null) {
- resultsReport = makeResultsTaskReport(
- queryDef,
- resultsYielder,
- task.getQuerySpec().getColumnMappings(),
- task.getSqlTypeNames(),
- MultiStageQueryContext.getSelectDestination(task.getQuerySpec().getQuery().context())
- );
- } else {
- resultsReport = null;
- }
-
final MSQTaskReportPayload taskReportPayload = new MSQTaskReportPayload(
makeStatusReport(
taskStateForReport,
@@ -564,7 +569,6 @@
countersSnapshot,
resultsReport
);
-
context.writeReports(
id(),
TaskReport.buildTaskReports(new MSQTaskReport(id(), taskReportPayload))
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
index db46b42..b96ce46 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/report/MSQResultsReport.java
@@ -28,8 +28,10 @@
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.java.util.common.guava.Yielder;
import org.apache.druid.java.util.common.guava.Yielders;
+import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.msq.exec.Limits;
import org.apache.druid.msq.indexing.destination.MSQSelectDestination;
+import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;
import javax.annotation.Nullable;
@@ -39,6 +41,7 @@
public class MSQResultsReport
{
+ private static final Logger log = new Logger(MSQResultsReport.class);
/**
* Like {@link org.apache.druid.segment.column.RowSignature}, but allows duplicate column names for compatibility
* with SQL (which also allows duplicate column names in query results).
@@ -83,18 +86,35 @@
MSQSelectDestination selectDestination
)
{
- if (selectDestination.shouldTruncateResultsInTaskReport()) {
- List<Object[]> results = new ArrayList<>();
- int rowCount = 0;
- while (!resultYielder.isDone() && rowCount < Limits.MAX_SELECT_RESULT_ROWS) {
- results.add(resultYielder.get());
- resultYielder = resultYielder.next(null);
- ++rowCount;
+ List<Object[]> results = new ArrayList<>();
+ long rowCount = 0;
+ int factor = 1;
+ while (!resultYielder.isDone()) {
+ results.add(resultYielder.get());
+ resultYielder = resultYielder.next(null);
+ ++rowCount;
+ if (selectDestination.shouldTruncateResultsInTaskReport() && rowCount >= Limits.MAX_SELECT_RESULT_ROWS) {
+ break;
}
- return new MSQResultsReport(signature, sqlTypeNames, Yielders.each(Sequences.simple(results)), !resultYielder.isDone());
- } else {
- return new MSQResultsReport(signature, sqlTypeNames, resultYielder, false);
+ if (rowCount % (factor * Limits.MAX_SELECT_RESULT_ROWS) == 0) {
+ log.warn(
+ "Task report is getting too large with %d rows. Large task reports can cause the controller to go out of memory. "
+ + "Consider using the 'limit %d' clause in your query to reduce the number of rows in the result. "
+ + "If you require all the results, consider setting [%s=%s] in the query context which will allow you to fetch large result sets.",
+ rowCount,
+ Limits.MAX_SELECT_RESULT_ROWS,
+ MultiStageQueryContext.CTX_SELECT_DESTINATION,
+ MSQSelectDestination.DURABLESTORAGE.getName()
+ );
+ factor = factor < 32 ? factor * 2 : 32;
+ }
}
+ return new MSQResultsReport(
+ signature,
+ sqlTypeNames,
+ Yielders.each(Sequences.simple(results)),
+ !resultYielder.isDone()
+ );
}
@JsonProperty("signature")