| package org.apache.helix.task; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Set; |
| |
| import org.apache.helix.integration.task.MockTask; |
| import org.apache.helix.integration.task.TaskTestBase; |
| import org.apache.helix.integration.task.TaskTestUtil; |
| import org.testng.Assert; |
| import org.testng.annotations.BeforeClass; |
| import org.testng.annotations.Test; |
| import org.apache.helix.TestHelper; |
| |
| public class TestGetLastScheduledTaskExecInfo extends TaskTestBase { |
| private final static String TASK_START_TIME_KEY = "START_TIME"; |
| private final static long INVALID_TIMESTAMP = -1L; |
| private static final long SHORT_EXECUTION_TIME = 10L; |
| private static final long LONG_EXECUTION_TIME = 99999999L; |
| private static final long DELETE_DELAY = 30000L; |
| |
| @BeforeClass |
| public void beforeClass() throws Exception { |
| setSingleTestEnvironment(); |
| super.beforeClass(); |
| } |
| |
| @Test |
| public void testGetLastScheduledTaskExecInfo() throws Exception { |
| // Start new queue that has one job with long tasks and record start time of the tasks |
| String queueName = TestHelper.getTestMethodName(); |
| // Create and start new queue that has one job with 5 tasks. |
| // Each task has a long execution time. |
| // Since NumConcurrentTasksPerInstance is equal to 2, here we wait until two tasks have |
| // been scheduled (expectedScheduledTime = 2). |
| List<Long> startTimesWithStuckTasks = setupTasks(queueName, 5, LONG_EXECUTION_TIME, 2); |
| // Wait till the job is in progress |
| _driver.pollForJobState(queueName, queueName + "_job_0", TaskState.IN_PROGRESS); |
| |
| // First two must be -1 (two tasks are stuck), and API call must return the last value (most |
| // recent timestamp) |
| Assert.assertEquals(startTimesWithStuckTasks.get(0).longValue(), INVALID_TIMESTAMP); |
| Assert.assertEquals(startTimesWithStuckTasks.get(1).longValue(), INVALID_TIMESTAMP); |
| |
| // Workflow will be stuck so its partition state will be Running |
| boolean hasQueueReachedDesiredState = TestHelper.verify(() -> { |
| Long lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp(queueName); |
| TaskExecutionInfo execInfo = _driver.getLastScheduledTaskExecutionInfo(queueName); |
| return (execInfo.getJobName().equals(queueName + "_job_0") |
| && execInfo.getTaskPartitionState() == TaskPartitionState.RUNNING |
| && execInfo.getStartTimeStamp().equals(lastScheduledTaskTs) |
| && startTimesWithStuckTasks.get(4).equals(lastScheduledTaskTs)); |
| }, TestHelper.WAIT_DURATION); |
| Assert.assertTrue(hasQueueReachedDesiredState); |
| |
| // Stop and delete the queue |
| _driver.stop(queueName); |
| _driver.deleteAndWaitForCompletion(queueName, DELETE_DELAY); |
| |
| // Start the new queue with new task configuration. |
| // Create and start new queue that has one job with 4 tasks. |
| // Each task has a short execution time. In the setupTasks we wait until all of the tasks have |
| // been scheduled (expectedScheduledTime = 4). |
| List<Long> startTimesFastTasks = setupTasks(queueName, 4, SHORT_EXECUTION_TIME, 4); |
| // Wait till the job is in progress or completed. Since the tasks have short execution time, we |
| // wait for either IN_PROGRESS or COMPLETED states |
| _driver.pollForJobState(queueName, queueName + "_job_0", TaskState.IN_PROGRESS, |
| TaskState.COMPLETED); |
| |
| hasQueueReachedDesiredState = TestHelper.verify(() -> { |
| Long lastScheduledTaskTs = _driver.getLastScheduledTaskTimestamp(queueName); |
| TaskExecutionInfo execInfo = _driver.getLastScheduledTaskExecutionInfo(queueName); |
| return (execInfo.getJobName().equals(queueName + "_job_0") |
| && execInfo.getTaskPartitionState() == TaskPartitionState.COMPLETED |
| && execInfo.getStartTimeStamp().equals(lastScheduledTaskTs) |
| && startTimesFastTasks.get(startTimesFastTasks.size() - 1).equals(lastScheduledTaskTs)); |
| }, TestHelper.WAIT_DURATION); |
| Assert.assertTrue(hasQueueReachedDesiredState); |
| } |
| |
| /** |
| * Helper method for gathering start times for all tasks. Returns start times in ascending order. |
| * Null start times |
| * are recorded as 0. |
| * @param jobQueueName name of the queue |
| * @param numTasks number of tasks to schedule |
| * @param taskTimeout duration of each task to be run for |
| * @param expectedScheduledTasks expected number of tasks that should be scheduled |
| * @return list of timestamps for all tasks in ascending order |
| * @throws Exception |
| */ |
| private List<Long> setupTasks(String jobQueueName, int numTasks, long taskTimeout, |
| int expectedScheduledTasks) throws Exception { |
| // Create a queue |
| JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(jobQueueName); |
| |
| // Create and enqueue a job |
| JobConfig.Builder jobConfig = new JobConfig.Builder(); |
| |
| // Create tasks |
| List<TaskConfig> taskConfigs = new ArrayList<>(); |
| for (int i = 0; i < numTasks; i++) { |
| taskConfigs |
| .add(new TaskConfig.Builder().setTaskId("task_" + i).setCommand(MockTask.TASK_COMMAND) |
| .addConfig(MockTask.JOB_DELAY, String.valueOf(taskTimeout)).build()); |
| } |
| // Run up to 2 tasks at a time |
| jobConfig.addTaskConfigs(taskConfigs).setNumConcurrentTasksPerInstance(2); |
| queueBuilder.enqueueJob("job_0", jobConfig); |
| _driver.start(queueBuilder.build()); |
| |
| _driver.pollForWorkflowState(jobQueueName, TaskState.IN_PROGRESS); |
| |
| boolean haveExpectedNumberOfTasksScheduled = TestHelper.verify(() -> { |
| int scheduleTask = 0; |
| WorkflowConfig workflowConfig = |
| TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName); |
| for (String job : workflowConfig.getJobDag().getAllNodes()) { |
| JobContext jobContext = _driver.getJobContext(job); |
| Set<Integer> allPartitions = jobContext.getPartitionSet(); |
| for (Integer partition : allPartitions) { |
| String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY); |
| if (timestamp != null) { |
| scheduleTask++; |
| } |
| } |
| } |
| return (scheduleTask == expectedScheduledTasks); |
| }, TestHelper.WAIT_DURATION); |
| Assert.assertTrue(haveExpectedNumberOfTasksScheduled); |
| |
| // Pull jobContexts and look at the start times |
| List<Long> startTimes = new ArrayList<>(); |
| WorkflowConfig workflowConfig = |
| TaskUtil.getWorkflowConfig(_manager.getHelixDataAccessor(), jobQueueName); |
| for (String job : workflowConfig.getJobDag().getAllNodes()) { |
| JobContext jobContext = _driver.getJobContext(job); |
| Set<Integer> allPartitions = jobContext.getPartitionSet(); |
| for (Integer partition : allPartitions) { |
| String timestamp = jobContext.getMapField(partition).get(TASK_START_TIME_KEY); |
| if (timestamp == null) { |
| startTimes.add(INVALID_TIMESTAMP); |
| } else { |
| startTimes.add(Long.parseLong(timestamp)); |
| } |
| } |
| } |
| Collections.sort(startTimes); |
| return startTimes; |
| } |
| } |