blob: 23f99a01ffbf68d9d367efbc4e6984a6024cdd62 [file] [log] [blame]
package org.apache.helix.integration.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.controller.stages.CurrentStateComputationStage;
import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.controller.stages.ResourceComputationStage;
import org.apache.helix.controller.stages.TaskGarbageCollectionStage;
import org.apache.helix.controller.stages.task.TaskPersistDataStage;
import org.apache.helix.controller.stages.task.TaskSchedulingStage;
import org.apache.helix.model.Message;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.ScheduleConfig;
import org.apache.helix.task.TargetState;
import org.apache.helix.task.TaskDriver;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.testng.Assert;
/**
* Static test utility methods.
*/
public class TaskTestUtil {
public static final String JOB_KW = "JOB";
private final static int _default_timeout = 2 * 60 * 1000; /* 2 mins */
public static void pollForEmptyJobState(final TaskDriver driver, final String workflowName,
final String jobName) throws Exception {
final String namespacedJobName = String.format("%s_%s", workflowName, jobName);
boolean succeed = TestHelper.verify(new TestHelper.Verifier() {
@Override public boolean verify() throws Exception {
WorkflowContext ctx = driver.getWorkflowContext(workflowName);
return ctx == null || ctx.getJobState(namespacedJobName) == null
|| ctx.getJobState(namespacedJobName) == TaskState.NOT_STARTED;
}
}, _default_timeout);
Assert.assertTrue(succeed);
}
public static WorkflowContext pollForWorkflowContext(TaskDriver driver, String workflowResource)
throws InterruptedException {
// Wait for completion.
long st = System.currentTimeMillis();
WorkflowContext ctx;
do {
ctx = driver.getWorkflowContext(workflowResource);
Thread.sleep(100);
} while (ctx == null && System.currentTimeMillis() < st + _default_timeout);
Assert.assertNotNull(ctx);
return ctx;
}
// 1. Different jobs in a same work flow is in RUNNING at the same time
// 2. When disallow overlap assignment, no two jobs in the same work flow is in RUNNING at the same instance
// Use this method with caution because it assumes workflow doesn't finish too quickly and number of parallel running
// tasks can be counted.
public static boolean pollForWorkflowParallelState(TaskDriver driver, String workflowName)
throws InterruptedException {
WorkflowConfig workflowConfig = driver.getWorkflowConfig(workflowName);
Assert.assertNotNull(workflowConfig);
WorkflowContext workflowContext = null;
while (workflowContext == null) {
workflowContext = driver.getWorkflowContext(workflowName);
Thread.sleep(100);
}
int maxRunningCount = 0;
boolean finished = false;
while (!finished) {
finished = true;
int runningCount = 0;
workflowContext = driver.getWorkflowContext(workflowName);
for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
TaskState jobState = workflowContext.getJobState(jobName);
if (jobState == TaskState.IN_PROGRESS) {
++runningCount;
finished = false;
}
}
if (runningCount > maxRunningCount ) {
maxRunningCount = runningCount;
}
Thread.sleep(100);
}
List<JobContext> jobContextList = new ArrayList<>();
for (String jobName : workflowConfig.getJobDag().getAllNodes()) {
JobContext jobContext = driver.getJobContext(jobName);
if (jobContext != null) {
jobContextList.add(driver.getJobContext(jobName));
}
}
Map<String, List<long[]>> rangeMap = new HashMap<>();
if (!workflowConfig.isAllowOverlapJobAssignment()) {
for (JobContext jobContext : jobContextList) {
for (int partition : jobContext.getPartitionSet()) {
String instance = jobContext.getAssignedParticipant(partition);
if (!rangeMap.containsKey(instance)) {
rangeMap.put(instance, new ArrayList<long[]>());
}
rangeMap.get(instance).add(new long[] { jobContext.getPartitionStartTime(partition),
jobContext.getPartitionFinishTime(partition)
});
}
}
}
for (List<long[]> timeRange : rangeMap.values()) {
Collections.sort(timeRange, new Comparator<long[]>() {
@Override
public int compare(long[] o1, long[] o2) {
return (int) (o1[0] - o2[0]);
}
});
for (int i = 0; i < timeRange.size() - 1; i++) {
if (timeRange.get(i)[1] > timeRange.get(i + 1)[0]) {
return false;
}
}
}
return maxRunningCount > 1 && (workflowConfig.isJobQueue() ? maxRunningCount <= workflowConfig
.getParallelJobs() : true);
}
public static Date getDateFromStartTime(String startTime)
{
int splitIndex = startTime.indexOf(':');
int hourOfDay = 0, minutes = 0;
try
{
hourOfDay = Integer.parseInt(startTime.substring(0, splitIndex));
minutes = Integer.parseInt(startTime.substring(splitIndex + 1));
}
catch (NumberFormatException e)
{
}
Calendar cal = Calendar.getInstance();
cal.set(Calendar.HOUR_OF_DAY, hourOfDay);
cal.set(Calendar.MINUTE, minutes);
cal.set(Calendar.SECOND, 0);
cal.set(Calendar.MILLISECOND, 0);
return cal.getTime();
}
public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart) {
return buildRecurrentJobQueue(jobQueueName, delayStart, 60);
}
public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
int recurrenceInSeconds) {
return buildRecurrentJobQueue(jobQueueName, delayStart, recurrenceInSeconds, null);
}
public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName, int delayStart,
int recurrenceInSeconds, TargetState targetState) {
WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder(jobQueueName);
workflowCfgBuilder.setExpiry(120000);
if (targetState != null) {
workflowCfgBuilder.setTargetState(TargetState.STOP);
}
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
cal.set(Calendar.MILLISECOND, 0);
ScheduleConfig scheduleConfig =
ScheduleConfig.recurringFromDate(cal.getTime(), TimeUnit.SECONDS, recurrenceInSeconds);
workflowCfgBuilder.setScheduleConfig(scheduleConfig);
return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
}
public static JobQueue.Builder buildRecurrentJobQueue(String jobQueueName) {
return buildRecurrentJobQueue(jobQueueName, 0);
}
public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
int failureThreshold, int capacity) {
WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder(jobQueueName);
workflowCfgBuilder.setExpiry(120000);
workflowCfgBuilder.setCapacity(capacity);
Calendar cal = Calendar.getInstance();
cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60);
cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60);
cal.set(Calendar.MILLISECOND, 0);
workflowCfgBuilder.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(cal.getTime()));
if (failureThreshold > 0) {
workflowCfgBuilder.setFailureThreshold(failureThreshold);
}
return new JobQueue.Builder(jobQueueName).setWorkflowConfig(workflowCfgBuilder.build());
}
public static JobQueue.Builder buildJobQueue(String jobQueueName, int delayStart,
int failureThreshold) {
return buildJobQueue(jobQueueName, delayStart, failureThreshold, 500);
}
public static JobQueue.Builder buildJobQueue(String jobQueueName) {
return buildJobQueue(jobQueueName, 0, 0, 500);
}
public static JobQueue.Builder buildJobQueue(String jobQueueName, int capacity) {
return buildJobQueue(jobQueueName, 0, 0, capacity);
}
public static WorkflowContext buildWorkflowContext(String workflowResource,
TaskState workflowState, Long startTime, TaskState... jobStates) {
WorkflowContext workflowContext =
new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
workflowContext.setName(workflowResource);
workflowContext.setStartTime(startTime == null ? System.currentTimeMillis() : startTime);
int jobId = 0;
for (TaskState jobstate : jobStates) {
workflowContext
.setJobState(TaskUtil.getNamespacedJobName(workflowResource, JOB_KW) + jobId++, jobstate);
}
workflowContext.setWorkflowState(workflowState);
return workflowContext;
}
public static JobContext buildJobContext(Long startTime, Long finishTime, TaskPartitionState... partitionStates) {
JobContext jobContext = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
jobContext.setStartTime(startTime == null ? System.currentTimeMillis() : startTime);
jobContext.setFinishTime(finishTime == null ? System.currentTimeMillis() : finishTime);
int partitionId = 0;
for (TaskPartitionState partitionState : partitionStates) {
jobContext.setPartitionState(partitionId++, partitionState);
}
return jobContext;
}
public static WorkflowControllerDataProvider buildDataProvider(HelixDataAccessor accessor,
String clusterName) {
WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider(clusterName);
cache.refresh(accessor);
return cache;
}
static void runStage(ClusterEvent event, Stage stage) throws Exception {
StageContext context = new StageContext();
stage.init(context);
stage.preProcess();
stage.process(event);
stage.postProcess();
}
public static BestPossibleStateOutput calculateTaskSchedulingStage(WorkflowControllerDataProvider cache,
HelixManager manager) throws Exception {
ClusterEvent event = new ClusterEvent(ClusterEventType.Unknown);
event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
event.addAttribute(AttributeName.helixmanager.name(), manager);
event.addAttribute(AttributeName.PipelineType.name(), "TASK");
Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> asyncFIFOWorkerPool =
new HashMap<>();
DedupEventProcessor<String, Runnable> worker =
new DedupEventProcessor<String, Runnable>("ClusterName",
AsyncWorkerType.TaskJobPurgeWorker.name()) {
@Override
protected void handleEvent(Runnable event) {
// TODO: retry when queue is empty and event.run() failed?
event.run();
}
};
worker.start();
asyncFIFOWorkerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), asyncFIFOWorkerPool);
List<Stage> stages = new ArrayList<Stage>();
stages.add(new ReadClusterDataStage());
stages.add(new ResourceComputationStage());
stages.add(new CurrentStateComputationStage());
stages.add(new TaskSchedulingStage());
stages.add(new TaskPersistDataStage());
stages.add(new TaskGarbageCollectionStage());
for (Stage stage : stages) {
runStage(event, stage);
}
return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
}
public static boolean pollForAllTasksBlock(HelixDataAccessor accessor, String instance, int numTask, long timeout)
throws InterruptedException {
PropertyKey propertyKey = accessor.keyBuilder().messages(instance);
long startTime = System.currentTimeMillis();
while (true) {
List<Message> messages = accessor.getChildValues(propertyKey);
if (allTasksBlock(messages, numTask)) {
return true;
} else if (startTime + timeout < System.currentTimeMillis()) {
return false;
} else {
Thread.sleep(100);
}
}
}
private static boolean allTasksBlock(List<Message> messages, int numTask) {
if (messages.size() != numTask) {
return false;
}
for (Message message : messages) {
if (!message.getFromState().equals(TaskPartitionState.INIT.name())
|| !message.getToState().equals(TaskPartitionState.RUNNING.name())) {
return false;
}
}
return true;
}
/**
* Implement this class to periodically check whether a defined condition is true,
* if timeout, check the condition for the last time and return the result.
*/
public static abstract class Poller {
private static final long DEFAULT_TIME_OUT = 1000*10;
public boolean poll() {
return poll(DEFAULT_TIME_OUT);
}
public boolean poll(long timeOut) {
long startTime = System.currentTimeMillis();
while (System.currentTimeMillis() < startTime + timeOut) {
if (check()) {
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
return check();
}
public abstract boolean check();
}
}