[Fix-17613] [Master] Task group queue priority always remains 0 (#17614)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
index 45d5738..5205dfc 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskDefinition.java
@@ -206,7 +206,7 @@
*/
private int taskGroupId;
/**
- * task group id
+ * task group priority, todo: we should add this field to task instance when create task instance
*/
private int taskGroupPriority;
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
index 75bb634..6fd885a 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/ITaskGroupCoordinator.java
@@ -19,6 +19,7 @@
import org.apache.dolphinscheduler.common.enums.Flag;
import org.apache.dolphinscheduler.common.enums.TaskGroupQueueStatus;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -66,9 +67,10 @@
* The TaskInstance shouldn't dispatch until there exist available slot, the taskGroupCoordinator notify it.
*
* @param taskInstance the task instance which want to acquire task group slot.
+ * @param taskDefinition the task definition which contains the task group.
* @throws IllegalArgumentException if the taskInstance is null or the used taskGroup doesn't exist.
*/
- void acquireTaskGroupSlot(TaskInstance taskInstance);
+ void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition taskDefinition);
/**
* If the TaskInstance is using TaskGroup then it need to release TaskGroupSlot.
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
index 4fe69e9..c4cf3c1 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinator.java
@@ -23,6 +23,7 @@
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -334,7 +335,7 @@
}
@Override
- public void acquireTaskGroupSlot(TaskInstance taskInstance) {
+ public void acquireTaskGroupSlot(TaskInstance taskInstance, TaskDefinition taskDefinition) {
if (taskInstance == null || taskInstance.getTaskGroupId() <= 0) {
throw new IllegalArgumentException("The current TaskInstance does not use task group");
}
@@ -353,7 +354,7 @@
.taskName(taskInstance.getName())
.groupId(taskInstance.getTaskGroupId())
.workflowInstanceId(taskInstance.getWorkflowInstanceId())
- .priority(taskInstance.getTaskGroupPriority())
+ .priority(taskDefinition.getTaskGroupPriority())
.inQueue(Flag.YES.getCode())
.forceStart(Flag.NO.getCode())
.status(TaskGroupQueueStatus.WAIT_QUEUE)
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
index b2e4c33..50b4e15 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/runnable/TaskExecutionRunnable.java
@@ -178,7 +178,7 @@
// larger number, higher priority
int taskGroupPriorityCompareResult =
- taskInstance.getTaskGroupPriority() - other.getTaskInstance().getTaskGroupPriority();
+ taskDefinition.getTaskGroupPriority() - other.getTaskDefinition().getTaskGroupPriority();
if (taskGroupPriorityCompareResult != 0) {
return -taskGroupPriorityCompareResult;
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
index e451cc7..ad0e652 100644
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
+++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java
@@ -21,6 +21,7 @@
import static org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus.DISPATCH;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao;
@@ -84,7 +85,8 @@
*/
protected void acquireTaskGroupSlot(final ITaskExecutionRunnable taskExecutionRunnable) {
final TaskInstance taskInstance = taskExecutionRunnable.getTaskInstance();
- taskGroupCoordinator.acquireTaskGroupSlot(taskInstance);
+ final TaskDefinition taskDefinition = taskExecutionRunnable.getTaskDefinition();
+ taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition);
}
/**
diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
index 4568e32..6f4b7d5 100644
--- a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
+++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/engine/TaskGroupCoordinatorTest.java
@@ -26,6 +26,7 @@
import static org.mockito.Mockito.when;
import org.apache.dolphinscheduler.common.enums.Flag;
+import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.TaskGroup;
import org.apache.dolphinscheduler.dao.entity.TaskGroupQueue;
import org.apache.dolphinscheduler.dao.entity.TaskInstance;
@@ -110,13 +111,15 @@
void acquireTaskGroupSlot() {
// TaskInstance is NULL
IllegalArgumentException illegalArgumentException =
- assertThrows(IllegalArgumentException.class, () -> taskGroupCoordinator.acquireTaskGroupSlot(null));
+ assertThrows(IllegalArgumentException.class,
+ () -> taskGroupCoordinator.acquireTaskGroupSlot(null, null));
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
+ TaskDefinition taskDefinition = new TaskDefinition();
// TaskGroupId is NULL
TaskInstance taskInstance = new TaskInstance();
illegalArgumentException = assertThrows(IllegalArgumentException.class,
- () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+ () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
assertEquals("The current TaskInstance does not use task group", illegalArgumentException.getMessage());
// TaskGroup not exist
@@ -124,12 +127,12 @@
taskInstance.setId(1);
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(null);
illegalArgumentException = assertThrows(IllegalArgumentException.class,
- () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+ () -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
assertEquals("The current TaskGroup: 1 does not exist", illegalArgumentException.getMessage());
// TaskGroup exist
when(taskGroupDao.queryById(taskInstance.getTaskGroupId())).thenReturn(new TaskGroup());
- Assertions.assertDoesNotThrow(() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance));
+ Assertions.assertDoesNotThrow(() -> taskGroupCoordinator.acquireTaskGroupSlot(taskInstance, taskDefinition));
}
diff --git a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
index d846e0f..e798e33 100644
--- a/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
+++ b/dolphinscheduler-master/src/test/resources/it/start/workflow_with_one_fake_task_using_task_group.yaml
@@ -46,6 +46,7 @@
taskParams: '{"localParams":null,"varPool":[],"shellScript":"sleep 5"}'
workerGroup: default
taskGroupId: 1
+ taskGroupPriority: 1
createTime: 2024-08-12 00:00:00
updateTime: 2021-08-12 00:00:00
taskExecuteType: BATCH