blob: 10283d0a8d38ffd4b36ae673511b03464c9592b4 [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Partition;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
* This unit test makes sure that FixedTargetedTaskAssignmentCalculator makes correct decision for
* targeted jobs
*/
public class TestFixedTargetedTaskAssignmentCalculator {
private static final String CLUSTER_NAME = "TestCluster";
private static final String INSTANCE_PREFIX = "Instance_";
private static final int NUM_PARTICIPANTS = 3;
private static final String WORKFLOW_NAME = "TestWorkflow";
private static final String JOB_NAME = "TestJob";
private static final String PARTITION_NAME = "0";
private static final String TARGET_PARTITION_NAME = "0";
private static final int PARTITION_ID = 0;
private static final String TARGET_RESOURCES = "TestDB";
private Map<String, LiveInstance> _liveInstances;
private Map<String, InstanceConfig> _instanceConfigs;
private ClusterConfig _clusterConfig;
private AssignableInstanceManager _assignableInstanceManager;
@BeforeClass
public void beforeClass() {
// Populate live instances and their corresponding instance configs
_liveInstances = new HashMap<>();
_instanceConfigs = new HashMap<>();
_clusterConfig = new ClusterConfig(CLUSTER_NAME);
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
String instanceName = INSTANCE_PREFIX + i;
LiveInstance liveInstance = new LiveInstance(instanceName);
InstanceConfig instanceConfig = new InstanceConfig(instanceName);
_liveInstances.put(instanceName, liveInstance);
_instanceConfigs.put(instanceName, instanceConfig);
}
}
/**
* Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
* before and target partition is still on the same instance and in RUNNING state,
* we do not make new assignment for that task.
*/
@Test
public void testFixedTargetTaskAssignmentCalculatorSameInstanceRunningTask() {
_assignableInstanceManager = new AssignableInstanceManager();
_assignableInstanceManager.buildAssignableInstances(_clusterConfig,
new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
// Preparing the inputs
String masterInstance = INSTANCE_PREFIX + 1;
String slaveInstance1 = INSTANCE_PREFIX + 2;
String slaveInstance2 = INSTANCE_PREFIX + 3;
CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING,
masterInstance, slaveInstance1, slaveInstance2);
List<String> instances =
new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
JobConfig jobConfig = prepareJobConfig();
JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, masterInstance);
WorkflowConfig workflowConfig = prepareWorkflowConfig();
WorkflowContext workflowContext = prepareWorkflowContext();
Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
Map<String, IdealState> idealStates =
prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
TaskAssignmentCalculator taskAssignmentCal =
new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
Map<String, SortedSet<Integer>> result =
taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
workflowConfig, workflowContext, partitionSet, idealStates);
Assert.assertEquals(result.get(masterInstance).size(),0);
Assert.assertEquals(result.get(slaveInstance1).size(),0);
Assert.assertEquals(result.get(slaveInstance2).size(),0);
}
/**
* Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
* before and target partition is still on the same instance and in INIT state,
* we do not make new assignment for that task.
*/
@Test
public void testFixedTargetTaskAssignmentCalculatorSameInstanceInitTask() {
_assignableInstanceManager = new AssignableInstanceManager();
_assignableInstanceManager.buildAssignableInstances(_clusterConfig,
new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
// Preparing the inputs
String masterInstance = INSTANCE_PREFIX + 1;
String slaveInstance1 = INSTANCE_PREFIX + 2;
String slaveInstance2 = INSTANCE_PREFIX + 3;
// Preparing the inputs
CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.INIT,
masterInstance, slaveInstance1, slaveInstance2);
List<String> instances =
new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
JobConfig jobConfig = prepareJobConfig();
JobContext jobContext = prepareJobContext(TaskPartitionState.INIT, masterInstance);
WorkflowConfig workflowConfig = prepareWorkflowConfig();
WorkflowContext workflowContext = prepareWorkflowContext();
Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
Map<String, IdealState> idealStates =
prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
TaskAssignmentCalculator taskAssignmentCal =
new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
Map<String, SortedSet<Integer>> result =
taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
workflowConfig, workflowContext, partitionSet, idealStates);
Assert.assertEquals(result.get(masterInstance).size(),0);
Assert.assertEquals(result.get(slaveInstance1).size(),0);
Assert.assertEquals(result.get(slaveInstance2).size(),0);
}
/**
* Test FixedTargetTaskAssignmentCalculator and make sure that if a job has been assigned
* before and target partition has moved to another instance, controller assign the task to
* new/correct instance.
*/
@Test
public void testFixedTargetTaskAssignmentCalculatorDifferentInstance() {
_assignableInstanceManager = new AssignableInstanceManager();
_assignableInstanceManager.buildAssignableInstances(_clusterConfig,
new TaskDataCache("CLUSTER_NAME"), _liveInstances, _instanceConfigs);
// Preparing the inputs
String masterInstance = INSTANCE_PREFIX + 2;
String slaveInstance1 = INSTANCE_PREFIX + 1;
String slaveInstance2 = INSTANCE_PREFIX + 3;
// Preparing the inputs
CurrentStateOutput currentStateOutput = prepareCurrentState(TaskPartitionState.RUNNING,
masterInstance, slaveInstance1, slaveInstance2);
List<String> instances =
new ArrayList<String>(Arrays.asList(masterInstance, slaveInstance1, slaveInstance2));
JobConfig jobConfig = prepareJobConfig();
JobContext jobContext = prepareJobContext(TaskPartitionState.RUNNING, slaveInstance1);
WorkflowConfig workflowConfig = prepareWorkflowConfig();
WorkflowContext workflowContext = prepareWorkflowContext();
Set<Integer> partitionSet = new HashSet<>(Collections.singletonList(PARTITION_ID));
Map<String, IdealState> idealStates =
prepareIdealStates(masterInstance, slaveInstance1, slaveInstance2);
TaskAssignmentCalculator taskAssignmentCal =
new FixedTargetTaskAssignmentCalculator(_assignableInstanceManager);
Map<String, SortedSet<Integer>> result =
taskAssignmentCal.getTaskAssignment(currentStateOutput, instances, jobConfig, jobContext,
workflowConfig, workflowContext, partitionSet, idealStates);
Assert.assertEquals(result.get(slaveInstance1).size(),0);
Assert.assertEquals(result.get(masterInstance).size(),1);
Assert.assertEquals(result.get(slaveInstance2).size(),0);
}
private JobConfig prepareJobConfig() {
JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
jobConfigBuilder.setWorkflow(WORKFLOW_NAME);
jobConfigBuilder.setCommand("TestCommand");
jobConfigBuilder.setJobId(JOB_NAME);
jobConfigBuilder.setTargetResource(TARGET_RESOURCES);
List<String> targetPartition = new ArrayList<>();
jobConfigBuilder.setTargetPartitions(targetPartition);
Set<String> targetPartitionStates = new HashSet<>(Collections.singletonList("MASTER"));
jobConfigBuilder.setTargetPartitions(targetPartition);
jobConfigBuilder.setTargetPartitionStates(targetPartitionStates);
List<TaskConfig> taskConfigs = new ArrayList<>();
TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
taskConfigBuilder.setTaskId("0");
taskConfigs.add(taskConfigBuilder.build());
jobConfigBuilder.addTaskConfigs(taskConfigs);
return jobConfigBuilder.build();
}
private JobContext prepareJobContext(TaskPartitionState taskPartitionState, String instance) {
ZNRecord record = new ZNRecord(JOB_NAME);
JobContext jobContext = new JobContext(record);
jobContext.setStartTime(0L);
jobContext.setName(JOB_NAME);
jobContext.setStartTime(0L);
jobContext.setPartitionState(PARTITION_ID, taskPartitionState);
jobContext.setPartitionTarget(PARTITION_ID, TARGET_RESOURCES + "_" + TARGET_PARTITION_NAME);
jobContext.setAssignedParticipant(PARTITION_ID, instance);
return jobContext;
}
private CurrentStateOutput prepareCurrentState(TaskPartitionState taskCurrentState,
String masterInstance, String slaveInstance1, String slaveInstance2) {
CurrentStateOutput currentStateOutput = new CurrentStateOutput();
currentStateOutput.setResourceStateModelDef(JOB_NAME, "TASK");
Partition taskPartition = new Partition(JOB_NAME + "_" + PARTITION_NAME);
currentStateOutput.setCurrentState(JOB_NAME, taskPartition, masterInstance,
taskCurrentState.name());
Partition dbPartition = new Partition(TARGET_RESOURCES + "_0");
currentStateOutput.setEndTime(TARGET_RESOURCES, dbPartition, masterInstance, 0L);
currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, masterInstance, "MASTER");
currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, slaveInstance1, "SLAVE");
currentStateOutput.setCurrentState(TARGET_RESOURCES, dbPartition, slaveInstance2, "SLAVE");
currentStateOutput.setInfo(TARGET_RESOURCES, dbPartition, masterInstance, "");
return currentStateOutput;
}
private WorkflowConfig prepareWorkflowConfig() {
WorkflowConfig.Builder workflowConfigBuilder = new WorkflowConfig.Builder();
workflowConfigBuilder.setWorkflowId(WORKFLOW_NAME);
workflowConfigBuilder.setTerminable(false);
workflowConfigBuilder.setTargetState(TargetState.START);
workflowConfigBuilder.setJobQueue(true);
JobDag jobDag = new JobDag();
jobDag.addNode(JOB_NAME);
workflowConfigBuilder.setJobDag(jobDag);
return workflowConfigBuilder.build();
}
private WorkflowContext prepareWorkflowContext() {
ZNRecord record = new ZNRecord(WORKFLOW_NAME);
record.setSimpleField(WorkflowContext.WorkflowContextProperties.StartTime.name(), "0");
record.setSimpleField(WorkflowContext.WorkflowContextProperties.NAME.name(), WORKFLOW_NAME);
record.setSimpleField(WorkflowContext.WorkflowContextProperties.STATE.name(),
TaskState.IN_PROGRESS.name());
Map<String, String> jobState = new HashMap<>();
jobState.put(JOB_NAME, TaskState.IN_PROGRESS.name());
record.setMapField(WorkflowContext.WorkflowContextProperties.JOB_STATES.name(), jobState);
return new WorkflowContext(record);
}
private Map<String, IdealState> prepareIdealStates(String instance1, String instance2,
String instance3) {
Map<String, IdealState> idealStates = new HashMap<>();
ZNRecord recordDB = new ZNRecord(TARGET_RESOURCES);
recordDB.setSimpleField(IdealState.IdealStateProperty.REPLICAS.name(), "3");
recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.name(), "FULL_AUTO");
recordDB.setSimpleField(IdealState.IdealStateProperty.IDEAL_STATE_MODE.name(),
"AUTO_REBALANCE");
recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
"MasterSlave");
recordDB.setSimpleField(IdealState.IdealStateProperty.STATE_MODEL_DEF_REF.name(),
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");
recordDB.setSimpleField(IdealState.IdealStateProperty.REBALANCER_CLASS_NAME.name(),
"org.apache.helix.controller.rebalancer.DelayedAutoRebalancer");
Map<String, String> mapping = new HashMap<>();
mapping.put(instance1, "MASTER");
mapping.put(instance2, "SLAVE");
mapping.put(instance3, "SLAVE");
recordDB.setMapField(TARGET_RESOURCES + "_0", mapping);
List<String> listField = new ArrayList<>();
listField.add(instance1);
listField.add(instance2);
listField.add(instance3);
recordDB.setListField(TARGET_RESOURCES + "_0", listField);
idealStates.put(TARGET_RESOURCES, new IdealState(recordDB));
return idealStates;
}
}