blob: 3669ae706f923d488d8c35ee7584c386c4a228ab [file] [log] [blame]
package org.apache.helix.task;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.caches.TaskDataCache;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.task.assigner.AssignableInstance;
import org.apache.helix.task.assigner.TaskAssignResult;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
public class TestAssignableInstanceManager {
private static final int NUM_PARTICIPANTS = 3;
private static final int NUM_JOBS = 3;
private static final int NUM_TASKS = 3;
private static final String CLUSTER_NAME = "TestCluster_0";
private static final String INSTANCE_PREFIX = "Instance_";
private static final String JOB_PREFIX = "Job_";
private static final String TASK_PREFIX = "Task_";
private ClusterConfig _clusterConfig;
private MockTaskDataCache _taskDataCache;
private AssignableInstanceManager _assignableInstanceManager;
private Map<String, LiveInstance> _liveInstances;
private Map<String, InstanceConfig> _instanceConfigs;
private Set<String> _taskIDs; // To keep track of what tasks were created
@BeforeClass
public void beforeClass() {
System.out.println(
"START " + this.getClass().getSimpleName() + " at " + new Date(System.currentTimeMillis()));
_clusterConfig = new ClusterConfig(CLUSTER_NAME);
_taskDataCache = new MockTaskDataCache(CLUSTER_NAME);
_liveInstances = new HashMap<>();
_instanceConfigs = new HashMap<>();
_taskIDs = new HashSet<>();
// Populate live instances and their corresponding instance configs
for (int i = 0; i < NUM_PARTICIPANTS; i++) {
String instanceName = INSTANCE_PREFIX + i;
LiveInstance liveInstance = new LiveInstance(instanceName);
InstanceConfig instanceConfig = new InstanceConfig(instanceName);
_liveInstances.put(instanceName, liveInstance);
_instanceConfigs.put(instanceName, instanceConfig);
}
// Populate taskDataCache with JobConfigs and JobContexts
for (int i = 0; i < NUM_JOBS; i++) {
String jobName = JOB_PREFIX + i;
// Create a JobConfig
JobConfig.Builder jobConfigBuilder = new JobConfig.Builder();
List<TaskConfig> taskConfigs = new ArrayList<>();
for (int j = 0; j < NUM_TASKS; j++) {
String taskID = jobName + "_" + TASK_PREFIX + j;
TaskConfig.Builder taskConfigBuilder = new TaskConfig.Builder();
taskConfigBuilder.setTaskId(taskID);
_taskIDs.add(taskID);
taskConfigs.add(taskConfigBuilder.build());
}
jobConfigBuilder.setJobId(jobName);
jobConfigBuilder.addTaskConfigs(taskConfigs);
jobConfigBuilder.setCommand("MOCK");
jobConfigBuilder.setWorkflow("WORKFLOW");
_taskDataCache.addJobConfig(jobName, jobConfigBuilder.build());
// Create a JobContext
ZNRecord znRecord = new ZNRecord(JOB_PREFIX + "context_" + i);
JobContext jobContext = new MockJobContext(znRecord, _liveInstances, _taskIDs);
_taskDataCache.addJobContext(jobName, jobContext);
_taskIDs.clear();
}
// Create an AssignableInstanceManager and build
_assignableInstanceManager = new AssignableInstanceManager();
_assignableInstanceManager.buildAssignableInstances(_clusterConfig, _taskDataCache,
_liveInstances, _instanceConfigs);
}
@Test
public void testGetAssignableInstanceMap() {
Map<String, AssignableInstance> assignableInstanceMap =
_assignableInstanceManager.getAssignableInstanceMap();
for (String liveInstance : _liveInstances.keySet()) {
Assert.assertTrue(assignableInstanceMap.containsKey(liveInstance));
}
}
@Test(dependsOnMethods = "testGetAssignableInstanceMap")
public void testGetTaskAssignResultMap() {
Map<String, TaskAssignResult> taskAssignResultMap =
_assignableInstanceManager.getTaskAssignResultMap();
for (String taskID : _taskIDs) {
Assert.assertTrue(taskAssignResultMap.containsKey(taskID));
}
}
@Test(dependsOnMethods = "testGetTaskAssignResultMap")
public void testUpdateAssignableInstances() {
Map<String, LiveInstance> newLiveInstances = new HashMap<>();
Map<String, InstanceConfig> newInstanceConfigs = new HashMap<>();
// A brand new set of LiveInstances
for (int i = NUM_PARTICIPANTS; i < NUM_PARTICIPANTS + 3; i++) {
String instanceName = INSTANCE_PREFIX + i;
newLiveInstances.put(instanceName, new LiveInstance(instanceName));
newInstanceConfigs.put(instanceName, new InstanceConfig(instanceName));
}
_assignableInstanceManager.updateAssignableInstances(_clusterConfig, newLiveInstances,
newInstanceConfigs);
// Check that the assignable instance map contains new instances and there are no
// TaskAssignResults due to previous live instances being removed
Assert.assertEquals(_assignableInstanceManager.getTaskAssignResultMap().size(), 0);
Assert.assertEquals(_assignableInstanceManager.getAssignableInstanceMap().size(),
newLiveInstances.size());
for (String instance : newLiveInstances.keySet()) {
Assert
.assertTrue(_assignableInstanceManager.getAssignableInstanceMap().containsKey(instance));
}
}
public class MockTaskDataCache extends TaskDataCache {
private Map<String, JobConfig> _jobConfigMap;
private Map<String, WorkflowConfig> _workflowConfigMap;
private Map<String, JobContext> _jobContextMap;
private Map<String, WorkflowContext> _workflowContextMap;
public MockTaskDataCache(String clusterName) {
super(clusterName);
_jobConfigMap = new HashMap<>();
_workflowConfigMap = new HashMap<>();
_jobContextMap = new HashMap<>();
_workflowContextMap = new HashMap<>();
}
public void addJobConfig(String jobName, JobConfig jobConfig) {
_jobConfigMap.put(jobName, jobConfig);
}
public void addJobContext(String jobName, JobContext jobContext) {
_jobContextMap.put(jobName, jobContext);
}
public void addWorkflowConfig(String workflowName, WorkflowConfig workflowConfig) {
_workflowConfigMap.put(workflowName, workflowConfig);
}
public void addWorkflowContext(String workflowName, WorkflowContext workflowContext) {
_workflowContextMap.put(workflowName, workflowContext);
}
@Override
public JobContext getJobContext(String jobName) {
return _jobContextMap.get(jobName);
}
@Override
public Map<String, JobConfig> getJobConfigMap() {
return _jobConfigMap;
}
@Override
public Map<String, WorkflowConfig> getWorkflowConfigMap() {
return _workflowConfigMap;
}
public Map<String, JobContext> getJobContextMap() {
return _jobContextMap;
}
public Map<String, WorkflowContext> getWorkflowContextMap() {
return _workflowContextMap;
}
}
public class MockJobContext extends JobContext {
private Set<Integer> _taskPartitionSet;
private Map<Integer, TaskPartitionState> _taskPartitionStateMap;
private Map<Integer, String> _partitionToTaskIDMap;
private Map<Integer, String> _taskToInstanceMap;
public MockJobContext(ZNRecord record, Map<String, LiveInstance> liveInstanceMap,
Set<String> taskIDs) {
super(record);
_taskPartitionSet = new HashSet<>();
_taskPartitionStateMap = new HashMap<>();
_partitionToTaskIDMap = new HashMap<>();
_taskToInstanceMap = new HashMap<>();
List<String> taskIDList = new ArrayList<>(taskIDs);
for (int i = 0; i < taskIDList.size(); i++) {
_taskPartitionSet.add(i);
_taskPartitionStateMap.put(i, TaskPartitionState.RUNNING);
_partitionToTaskIDMap.put(i, taskIDList.get(i));
String someInstance = liveInstanceMap.keySet().iterator().next();
_taskToInstanceMap.put(i, someInstance);
}
}
@Override
public Set<Integer> getPartitionSet() {
return _taskPartitionSet;
}
@Override
public TaskPartitionState getPartitionState(int p) {
return _taskPartitionStateMap.get(p);
}
@Override
public String getAssignedParticipant(int p) {
return _taskToInstanceMap.get(p);
}
@Override
public String getTaskIdForPartition(int p) {
return _partitionToTaskIDMap.get(p);
}
}
}