blob: 43fda005c9efd2ed522408440388c66181c54999 [file] [log] [blame]
package org.apache.helix.controller.stages;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixConstants;
import org.apache.helix.PropertyKey;
import org.apache.helix.TestHelper;
import org.apache.helix.common.DedupEventProcessor;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.stages.task.TaskPersistDataStage;
import org.apache.helix.integration.task.TaskTestBase;
import org.apache.helix.integration.task.TaskTestUtil;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.JobQueue;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskPartitionState;
import org.apache.helix.task.TaskState;
import org.apache.helix.task.WorkflowContext;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestTaskStage extends TaskTestBase {
private ClusterEvent _event = new ClusterEvent(CLUSTER_NAME, ClusterEventType.CurrentStateChange);
private PropertyKey.Builder _keyBuilder;
private String _testWorkflow = TestHelper.getTestClassName();
private String _testJobPrefix = _testWorkflow + "_Job_";
@BeforeClass
public void beforeClass() throws Exception {
super.beforeClass();
// Stop the controller for isolated testing of the stage
_controller.syncStop();
_keyBuilder = _manager.getHelixDataAccessor().keyBuilder();
}
@Test
public void testPersistContextData() {
_event.addAttribute(AttributeName.helixmanager.name(), _manager);
WorkflowControllerDataProvider cache = new WorkflowControllerDataProvider(CLUSTER_NAME);
TaskDataCache taskDataCache = cache.getTaskDataCache();
// Build a queue
JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(_testWorkflow);
JobConfig.Builder jobBuilder_0 =
new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
JobConfig.Builder jobBuilder_1 =
new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
JobConfig.Builder jobBuilder_2 =
new JobConfig.Builder().setJobId("Job_0").setTargetResource("1").setCommand("1");
queueBuilder.enqueueJob("Job_0", jobBuilder_0).enqueueJob("Job_1", jobBuilder_1)
.enqueueJob("Job_2", jobBuilder_2);
_driver.createQueue(queueBuilder.build());
// Manually trigger a cache refresh
cache.refresh(new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor));
// Create the IdealState ZNode for the jobs
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "0", 1,
TaskConstants.STATE_MODEL_NAME);
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "1", 1,
TaskConstants.STATE_MODEL_NAME);
_gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, _testJobPrefix + "2", 1,
TaskConstants.STATE_MODEL_NAME);
// Create the context
WorkflowContext wfCtx = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
wfCtx.setJobState(_testJobPrefix + "0", TaskState.COMPLETED);
wfCtx.setJobState(_testJobPrefix + "1", TaskState.COMPLETED);
wfCtx.setWorkflowState(TaskState.IN_PROGRESS);
wfCtx.setName(_testWorkflow);
wfCtx.setStartTime(System.currentTimeMillis());
JobContext jbCtx0 = new JobContext(new ZNRecord(_testJobPrefix + "0"));
jbCtx0.setName(_testJobPrefix + "0");
jbCtx0.setStartTime(System.currentTimeMillis());
jbCtx0.setPartitionState(0, TaskPartitionState.COMPLETED);
JobContext jbCtx1 = new JobContext((new ZNRecord(_testJobPrefix + "1")));
jbCtx1.setName(_testJobPrefix + "1");
jbCtx1.setStartTime(System.currentTimeMillis());
jbCtx1.setPartitionState(0, TaskPartitionState.COMPLETED);
taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
taskDataCache.updateJobContext(_testJobPrefix + "1", jbCtx1);
_event.addAttribute(AttributeName.ControllerDataProvider.name(), cache);
// Write contexts to ZK first
TaskPersistDataStage persistDataStage = new TaskPersistDataStage();
persistDataStage.process(_event);
Assert.assertNotNull(_driver.getWorkflowContext(_testWorkflow));
Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "0"));
Assert.assertNotNull(_driver.getJobContext(_testJobPrefix + "1"));
jbCtx0.setPartitionState(0, TaskPartitionState.ERROR);
wfCtx.setJobState(_testJobPrefix + "0", TaskState.FAILED);
taskDataCache.updateJobContext(_testJobPrefix + "0", jbCtx0);
wfCtx.getJobStates().remove(_testJobPrefix + "1");
taskDataCache.removeContext(_testJobPrefix + "1");
JobContext jbCtx2 = new JobContext(new ZNRecord(_testJobPrefix + "2"));
jbCtx2.setName(_testJobPrefix + "2");
jbCtx2.setPartitionState(1, TaskPartitionState.INIT);
wfCtx.setJobState(_testJobPrefix + "2", TaskState.IN_PROGRESS);
taskDataCache.updateJobContext(_testJobPrefix + "2", jbCtx2);
taskDataCache.updateWorkflowContext(_testWorkflow, wfCtx);
persistDataStage.process(_event);
Assert.assertEquals(_driver.getWorkflowContext(_testWorkflow), wfCtx);
Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "0"), jbCtx0);
Assert.assertEquals(_driver.getJobContext(_testJobPrefix + "2"), jbCtx2);
Assert.assertNull(_driver.getJobContext(_testJobPrefix + "1"));
}
/**
* Test that if there is a job in the DAG with JobConfig gone (due to ZK delete failure), the
* async job purge will try to delete it again.
*/
@Test(dependsOnMethods = "testPersistContextData")
public void testPartialDataPurge() throws Exception {
DedupEventProcessor<String, Runnable> worker =
new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
AsyncWorkerType.TaskJobPurgeWorker.name()) {
@Override
protected void handleEvent(Runnable event) {
event.run();
}
};
worker.start();
Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
_event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
// Manually delete JobConfig
deleteJobConfigs(_testWorkflow, _testJobPrefix + "0");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "1");
deleteJobConfigs(_testWorkflow, _testJobPrefix + "2");
// Manually refresh because there's no controller notify data change
BaseControllerDataProvider dataProvider =
_event.getAttribute(AttributeName.ControllerDataProvider.name());
dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
dataProvider.refresh(_manager.getHelixDataAccessor());
// Then purge jobs
TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
garbageCollectionStage.process(_event);
// Check that IS and contexts have been purged for the 2 jobs in both old and new ZNode paths
// IdealState check
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "0");
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "1");
checkForIdealStateAndContextRemoval(_testWorkflow, _testJobPrefix + "2");
}
@Test(dependsOnMethods = "testPartialDataPurge")
public void testWorkflowGarbageCollection() throws Exception {
DedupEventProcessor<String, Runnable> worker =
new DedupEventProcessor<String, Runnable>(CLUSTER_NAME,
AsyncWorkerType.TaskJobPurgeWorker.name()) {
@Override
protected void handleEvent(Runnable event) {
event.run();
}
};
worker.start();
Map<AsyncWorkerType, DedupEventProcessor<String, Runnable>> workerPool = new HashMap<>();
workerPool.put(AsyncWorkerType.TaskJobPurgeWorker, worker);
_event.addAttribute(AttributeName.AsyncFIFOWorkerPool.name(), workerPool);
String zkPath =
_manager.getHelixDataAccessor().keyBuilder().resourceConfig(_testWorkflow).getPath();
_baseAccessor.remove(zkPath, AccessOption.PERSISTENT);
// Manually refresh because there's no controller notify data change
BaseControllerDataProvider dataProvider =
_event.getAttribute(AttributeName.ControllerDataProvider.name());
dataProvider.notifyDataChange(HelixConstants.ChangeType.RESOURCE_CONFIG);
dataProvider.refresh(_manager.getHelixDataAccessor());
// Then garbage collect workflow
TaskGarbageCollectionStage garbageCollectionStage = new TaskGarbageCollectionStage();
garbageCollectionStage.process(_event);
// Check that IS and contexts have been purged for the workflow
checkForIdealStateAndContextRemoval(_testWorkflow);
worker.shutdown();
}
private void deleteJobConfigs(String workflowName, String jobName) {
String oldPath = _manager.getHelixDataAccessor().keyBuilder().resourceConfig(jobName).getPath();
String newPath = _manager.getHelixDataAccessor().keyBuilder()
.jobConfigZNode(workflowName, jobName).getPath();
_baseAccessor.remove(oldPath, AccessOption.PERSISTENT);
_baseAccessor.remove(newPath, AccessOption.PERSISTENT);
}
private void checkForIdealStateAndContextRemoval(String workflow, String job) throws Exception {
// JobContexts in old ZNode path
String oldPath =
String.format("/%s/PROPERTYSTORE/TaskRebalancer/%s/Context", CLUSTER_NAME, job);
String newPath = _keyBuilder.jobContextZNode(workflow, job).getPath();
Assert.assertTrue(TestHelper.verify(
() -> !_baseAccessor.exists(_keyBuilder.idealStates(job).getPath(), AccessOption.PERSISTENT)
&& !_baseAccessor.exists(oldPath, AccessOption.PERSISTENT) && !_baseAccessor
.exists(newPath, AccessOption.PERSISTENT), 120000));
}
private void checkForIdealStateAndContextRemoval(String workflow) throws Exception {
Assert.assertTrue(TestHelper.verify(() ->
!_baseAccessor.exists(_keyBuilder.idealStates(workflow).getPath(), AccessOption.PERSISTENT)
&& !_baseAccessor
.exists(_keyBuilder.workflowContextZNode(workflow).getPath(), AccessOption.PERSISTENT),
120000));
}
}