blob: fefc7374e1528a0f4b5d72b1de08deb903dda65f [file] [log] [blame]
package org.apache.helix.controller.stages;
import org.apache.helix.AccessOption;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.task.TaskPersistDataStage;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.TaskTestUtil;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowContext;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestTaskStage extends TaskTestBase {
private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange);
private PropertyKey.Builder _keyBuilder;
private String _testWorkflow = TestHelper.getTestClassName();
private String _testJobPrefix = _testWorkflow + "_Job_";
@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
// Stop the controller for isolated testing of the stage
_controller.syncStop();
_keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
}
@Test
public void testPersistContextData() {
_event.addAttribute(AttributeName.helixmanager.name(), _manager);
WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider(CLUSTER_NAME);
TaskDataCache taskDataCache = cache.getTaskDataCache();
// Build a queue
JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(_testWorkflow);
JobConfig.Builder jobBuilder_0 =
new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
JobConfig.Builder jobBuilder_1 =
new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
JobConfig.Builder jobBuilder_2 =
new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
.enqueueJob("Job_2", jobBuilder_2);
_driver.createQueue(queueBuilder.build());
// Manually trigger a cache refresh
cache.refresh(new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor));
// Create the IdealState ZNode for the jobs
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "0", 1,
TaskConstants.STATE_MODEL_NAME);
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "1", 1,
TaskConstants.STATE_MODEL_NAME);
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "2", 1,
TaskConstants.STATE_MODEL_NAME);
// Create the context
WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(_testWorkflow));
wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
wfCtx.setName(_testWorkflow);
wfCtx.setStartTime(System.currentTimeMillis());
JobContext jbCtx0 = new JobContext(new ZNRecord(_testJobPrefix + "0"));
jbCtx0.setName(_testJobPrefix + "0");
jbCtx0.setStartTime(System.currentTimeMillis());
jbCtx0.setPartitionState(0, TaskPartitionState.COMPLETED);
JobContext jbCtx1 = new JobContext((new ZNRecord(_testJobPrefix + "1")));
jbCtx1.setName(_testJobPrefix + "1");
jbCtx1.setStartTime(System.currentTimeMillis());
jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED);
taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
taskDataCache.updateJobContext(_testJobPrefix + "1", jbCtx1);
_event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
// Write contexts to ZK first
TaskPersistDataStage persistDataStage = new TaskPersistDataStage();
persistDataStage.process(_event);
Assert.assertNotNull(_driver.getWorkflowContext(_testWorkflow));
Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "0"));
Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "1"));
jbCtx0.setPartitionState(0, TaskPartitionState.ERROR);
wfCtx.setJobState(_testJobPrefix + "0", TaskState.FAILED);
taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
wfCtx.getJobStates().remove(_testJobPrefix + "1");
taskDataCache.removeContext(_testJobPrefix + "1");
JobContext jbCtx2 = new JobContext(new ZNRecord(_testJobPrefix + "2"));
jbCtx2.setName(_testJobPrefix + "2");
jbCtx2.setPartitionState(1, TaskPartitionState.INIT);
wfCtx.setJobState(_testJobPrefix + "2", TaskState.IN_PROGRESS);
taskDataCache.updateJobContext(_testJobPrefix + "2", jbCtx2);
taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
persistDataStage.process(_event);
Assert.assertEquals(_driver.getWorkflowContext(_testWorkflow), wfCtx);
Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "0"), jbCtx0);
Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "2"), jbCtx2);
Assert.assertNull(_driver.getJobContext(_testJobPrefix + "1"));
}
/**
* Test that if there is a job in the DAG with JobConfig gone (due to ZK delete failure), the
* async job purge will try to delete it again.
*/
@Test(dependsOnMethods = "testPersistContextData")
public void testPartialDataPurge() {
// Manually delete JobConfig
deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
// Then purge jobs
TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
garbageCollectionStage.execute(_event);
// Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
// IdealState check
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "0");
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "1");
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
}
private void deleteJobConfigs(String workflowName, String jobName) {
String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
String newPath = _manager.getHelixDataAccessor().keyBuilder()
.jobConfigZNode(workflowName, jobName).getPath();
_baseAccessor.remove(oldPath, AccessOption.PERSISTENT);
_baseAccessor.remove(newPath, AccessOption.PERSISTENT);
}
private void checkForIdealStateAndContextRemoval(String workflow, String job) {
// IdealState
Assert.assertFalse(
_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT));
// JobContexts in old ZNode path
String oldPath =
String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
Assert.assertFalse(_baseAccessor.exists(oldPath, AccessOption.PERSISTENT));
Assert.assertFalse(_baseAccessor.exists(newPath, AccessOption.PERSISTENT));
}
}