blob: b16e4843ea0ec2f60116d1537b9dacb19100871b [file] [log] [blame]
package org.apache.helix.common.caches;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyType;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.common.controllers.ControlContextProvider;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.JobContext;
import org.apache.helix.task.RuntimeJobDag;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cache for holding all task related cluster data, such as WorkflowConfig, JobConfig and Contexts.
*/
public class TaskDataCache extends AbstractDataCache {
private static final Logger LOG = LoggerFactory.getLogger(TaskDataCache.class.getName());
private static final String NAME = "NAME";
private Map<String, JobConfig> _jobConfigMap = new HashMap<>();
private Map<String, RuntimeJobDag> _runtimeJobDagMap = new HashMap<>();
private Map<String, WorkflowConfig> _workflowConfigMap = new ConcurrentHashMap<>();
// TODO: context and previous assignment should be wrapped into a class. Otherwise, int the future,
// concurrency will be hard to handle.
private Map<String, ZNRecord> _contextMap = new HashMap<>();
private Map<String, ZNRecord> _prevAssignmentMap = new HashMap<>();
private Set<String> _prevAssignmentToUpdate = new HashSet<>();
private Set<String> _prevAssignmentToRemove = new HashSet<>();
private Set<String> _contextToUpdate = new HashSet<>();
private Set<String> _contextToRemove = new HashSet<>();
// The following fields have been added for quota-based task scheduling
private final AssignableInstanceManager _assignableInstanceManager =
new AssignableInstanceManager();
// Current usage for this scheduled jobs is used for differentiate the jobs has been processed in
// JobDispatcher from RESOURCE_TO_BALANCE to reduce the redundant computation.
private Set<String> _dispatchedJobs = new HashSet<>();
private enum TaskDataType {
CONTEXT,
PREV_ASSIGNMENT
}
public TaskDataCache(ControlContextProvider contextProvider) {
super(contextProvider);
}
/**
* Original constructor for TaskDataCache.
*
* @param clusterName
*/
public TaskDataCache(String clusterName) {
this(createDefaultControlContextProvider(clusterName));
}
/**
* This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
*
* @param accessor
*
* @return
*/
public synchronized boolean refresh(HelixDataAccessor accessor,
Map<String, ResourceConfig> resourceConfigMap) {
refreshContextsAndPreviousAssignments(accessor);
// update workflow and job configs.
_workflowConfigMap.clear();
Map<String, JobConfig> newJobConfigs = new HashMap<>();
Set<String> workflowsUpdated = new HashSet<>();
for (Map.Entry<String, ResourceConfig> entry : resourceConfigMap.entrySet()) {
if (entry.getValue().getRecord().getSimpleFields()
.containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) {
_workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue()));
if (!_runtimeJobDagMap.containsKey(entry.getKey())) {
WorkflowConfig workflowConfig = _workflowConfigMap.get(entry.getKey());
_runtimeJobDagMap.put(entry.getKey(), new RuntimeJobDag(workflowConfig.getJobDag(),
workflowConfig.isJobQueue() || !workflowConfig.isTerminable(),
workflowConfig.getParallelJobs()));
}
} else if (entry.getValue().getRecord().getSimpleFields()
.containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) {
newJobConfigs.put(entry.getKey(), new JobConfig(entry.getValue()));
}
}
// The following 3 blocks is for finding a list of workflows whose JobDAGs have been changed
// because their RuntimeJobDags would need to be re-built
// newly added jobs
for (String jobName : newJobConfigs.keySet()) {
if (!_jobConfigMap.containsKey(jobName) && newJobConfigs.get(jobName).getWorkflow() != null) {
workflowsUpdated.add(newJobConfigs.get(jobName).getWorkflow());
}
// Only for JobQueues when a new job is enqueued, there exists a race condition where only
// JobConfig is updated and the RuntimeJobDag does not get updated because when the client
// (TaskDriver) submits, it creates JobConfig ZNode first and modifies its parent JobDag next.
// To ensure that they are both properly updated, check that workflow's DAG and existing
// JobConfigs are consistent for JobQueues
JobConfig jobConfig = newJobConfigs.get(jobName);
if (_workflowConfigMap.containsKey(jobConfig.getWorkflow())) {
WorkflowConfig workflowConfig = _workflowConfigMap.get(jobConfig.getWorkflow());
// Check that the job's parent workflow's DAG contains this job
if ((workflowConfig.isJobQueue() || !workflowConfig.isTerminable()) && !_runtimeJobDagMap
.get(workflowConfig.getWorkflowId()).getAllNodes().contains(jobName)) {
// Inconsistency between JobConfigs and DAGs found. Add the workflow to workflowsUpdated
// to rebuild the RuntimeJobDag
workflowsUpdated.add(jobConfig.getWorkflow());
}
}
}
// Removed jobs
for (String jobName : _jobConfigMap.keySet()) {
if (!newJobConfigs.containsKey(jobName) && _jobConfigMap.get(jobName).getWorkflow() != null) {
workflowsUpdated.add(_jobConfigMap.get(jobName).getWorkflow());
}
}
// Combine all the workflows' job dag which need update
for (String changedWorkflow : workflowsUpdated) {
if (_workflowConfigMap.containsKey(changedWorkflow)) {
WorkflowConfig workflowConfig = _workflowConfigMap.get(changedWorkflow);
_runtimeJobDagMap.put(changedWorkflow, new RuntimeJobDag(workflowConfig.getJobDag(),
workflowConfig.isJobQueue() || !workflowConfig.isTerminable(),
workflowConfig.getParallelJobs()));
}
}
_dispatchedJobs.clear();
_runtimeJobDagMap.keySet().retainAll(_workflowConfigMap.keySet());
_jobConfigMap = newJobConfigs;
return true;
}
private void refreshContextsAndPreviousAssignments(HelixDataAccessor accessor) {
// TODO: Need an optimize for reading context only if the refresh is needed.
long start = System.currentTimeMillis();
_contextMap.clear();
_prevAssignmentMap.clear();
if (_controlContextProvider.getClusterName() == null || _controlContextProvider.getClusterName()
.equalsIgnoreCase(UNKNOWN_CLUSTER)) {
return;
}
String path = String.format("/%s/%s%s", _controlContextProvider.getClusterName(),
PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT);
List<String> contextPaths = new ArrayList<>();
List<String> prevAssignmentPaths = new ArrayList<>();
List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0);
if (childNames == null) {
return;
}
for (String resourceName : childNames) {
contextPaths.add(getTaskDataPath(resourceName, TaskDataType.CONTEXT));
//Workflow does not have previous assignment
if (!_workflowConfigMap.containsKey(resourceName)) {
prevAssignmentPaths.add(getTaskDataPath(resourceName, TaskDataType.PREV_ASSIGNMENT));
}
}
List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0);
List<ZNRecord> prevAssignments =
accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0);
for (int i = 0; i < contexts.size(); i++) {
ZNRecord context = contexts.get(i);
if (context != null && context.getSimpleField(NAME) != null) {
_contextMap.put(context.getSimpleField(NAME), context);
} else {
_contextMap.put(childNames.get(i), context);
LogUtil.logDebug(LOG, genEventInfo(),
String.format("Context for %s is null or miss the context NAME!", childNames.get((i))));
}
}
for (ZNRecord prevAssignment : prevAssignments) {
if (prevAssignment != null) {
_prevAssignmentMap.put(prevAssignment.getId(), prevAssignment);
}
}
if (LOG.isDebugEnabled()) {
LogUtil.logDebug(LOG, genEventInfo(),
"# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + (
System.currentTimeMillis() - start) + " ms");
}
}
/**
* Returns job config map
*
* @return
*/
public Map<String, JobConfig> getJobConfigMap() {
return _jobConfigMap;
}
/**
* Returns job config
*
* @param resource
*
* @return
*/
public JobConfig getJobConfig(String resource) {
return _jobConfigMap.get(resource);
}
/**
* Returns workflow config map
*
* @return
*/
public Map<String, WorkflowConfig> getWorkflowConfigMap() {
return _workflowConfigMap;
}
/**
* Returns workflow config
*
* @param resource
*
* @return
*/
public WorkflowConfig getWorkflowConfig(String resource) {
return _workflowConfigMap.get(resource);
}
/**
* Return the JobContext by resource name
*
* @param resourceName
*
* @return
*/
public JobContext getJobContext(String resourceName) {
if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
return new JobContext(_contextMap.get(resourceName));
}
return null;
}
/**
* Return the WorkflowContext by resource name
*
* @param resourceName
*
* @return
*/
public WorkflowContext getWorkflowContext(String resourceName) {
if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) {
return new WorkflowContext(_contextMap.get(resourceName));
}
return null;
}
/**
* Update context of the Job
*/
public void updateJobContext(String resourceName, JobContext jobContext) {
updateContext(resourceName, jobContext.getRecord());
}
/**
* Update context of the Workflow
*/
public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext) {
updateContext(resourceName, workflowContext.getRecord());
}
/**
* Update context of the Workflow or Job
*/
private void updateContext(String resourceName, ZNRecord record) {
_contextMap.put(resourceName, record);
_contextToUpdate.add(resourceName);
}
public void persistDataChanges(HelixDataAccessor accessor) {
// Do not update it if the is need to be remove
_contextToUpdate.removeAll(_contextToRemove);
batchUpdateData(accessor, new ArrayList<>(_contextToUpdate), _contextMap, _contextToUpdate,
TaskDataType.CONTEXT);
batchDeleteData(accessor, new ArrayList<>(_contextToRemove), TaskDataType.CONTEXT);
_contextToRemove.clear();
_prevAssignmentToUpdate.removeAll(_prevAssignmentToRemove);
batchUpdateData(accessor, new ArrayList<>(_prevAssignmentToUpdate), _prevAssignmentMap,
_prevAssignmentToUpdate, TaskDataType.PREV_ASSIGNMENT);
batchDeleteData(accessor, new ArrayList<>(_prevAssignmentToRemove),
TaskDataType.PREV_ASSIGNMENT);
_prevAssignmentToRemove.clear();
}
private void batchUpdateData(HelixDataAccessor accessor, List<String> dataUpdateNames,
Map<String, ZNRecord> dataMap, Set<String> dataToUpdate, TaskDataType taskDataType) {
List<String> contextUpdatePaths = new ArrayList<>();
List<ZNRecord> updatedData = new ArrayList<>();
for (String resourceName : dataUpdateNames) {
if (dataMap.get(resourceName) != null) {
contextUpdatePaths.add(getTaskDataPath(resourceName, taskDataType));
updatedData.add(dataMap.get(resourceName));
}
}
boolean[] updateSuccess = accessor.getBaseDataAccessor()
.setChildren(contextUpdatePaths, updatedData, AccessOption.PERSISTENT);
for (int i = 0; i < updateSuccess.length; i++) {
if (updateSuccess[i]) {
dataToUpdate.remove(dataUpdateNames.get(i));
} else {
LogUtil.logWarn(LOG, _controlContextProvider.getClusterEventId(), String
.format("Failed to update the %s for %s", taskDataType.name(), dataUpdateNames.get(i)));
}
}
}
private void batchDeleteData(HelixDataAccessor accessor, List<String> contextNamesToRemove,
TaskDataType taskDataType) {
// Delete contexts
// We can not leave the context here since some of the deletion happens for cleaning workflow
// If we leave it in the memory, Helix will not allow user create it with same name.
// TODO: Let's have periodical clean up thread that could remove deletion failed contexts.
List<String> contextPathsToRemove = new ArrayList<>();
for (String resourceName : contextNamesToRemove) {
contextPathsToRemove.add(getTaskDataPath(resourceName, taskDataType));
}
// TODO: current behavior is when you delete non-existing data will return false.
// Once the behavior fixed, we can add retry logic back. Otherwise, it will stay in memory and
// not allow same workflow name recreation.
accessor.getBaseDataAccessor().remove(contextPathsToRemove, AccessOption.PERSISTENT);
}
/**
* Return map of WorkflowContexts or JobContexts
*
* @return
*/
public Map<String, ZNRecord> getContexts() {
return _contextMap;
}
/**
* Returns the current AssignableInstanceManager instance.
*
* @return
*/
public AssignableInstanceManager getAssignableInstanceManager() {
return _assignableInstanceManager;
}
/**
* Remove Workflow or Job context from cache
*
* @param resourceName
*/
public void removeContext(String resourceName) {
if (_contextMap.containsKey(resourceName)) {
_contextMap.remove(resourceName);
_contextToRemove.add(resourceName);
}
}
@Override
public String toString() {
return "TaskDataCache{" + "_jobConfigMap=" + _jobConfigMap + ", _workflowConfigMap="
+ _workflowConfigMap + ", _contextMap=" + _contextMap + ", _clusterName='"
+ _controlContextProvider.getClusterName() + '\'' + '}';
}
/**
* Get the path based on different data types. If the type does not exist, it will return null
* instead.
*
* @param resourceName
* @param taskDataType
* @return
*/
private String getTaskDataPath(String resourceName, TaskDataType taskDataType) {
String prevFix = String.format("/%s/%s%s/%s", _controlContextProvider.getClusterName(),
PropertyType.PROPERTYSTORE.name(), TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName);
switch (taskDataType) {
case CONTEXT:
return String.format("%s/%s", prevFix, TaskConstants.CONTEXT_NODE);
case PREV_ASSIGNMENT:
return String.format("%s/%s", prevFix, TaskConstants.PREV_RA_NODE);
}
return null;
}
public void dispatchJob(String jobName) {
_dispatchedJobs.add(jobName);
}
public void removeDispatchedJob(String jobName) {
_dispatchedJobs.remove(jobName);
}
public Set<String> getDispatchedJobs() {
return _dispatchedJobs;
}
public RuntimeJobDag getRuntimeJobDag(String workflowName) {
if (_runtimeJobDagMap.containsKey(workflowName)) {
return _runtimeJobDagMap.get(workflowName);
}
return null;
}
public ResourceAssignment getPreviousAssignment(String resourceName) {
return _prevAssignmentMap.get(resourceName) != null ? new ResourceAssignment(
_prevAssignmentMap.get(resourceName)) : null;
}
public void setPreviousAssignment(String resourceName, ResourceAssignment prevAssignment) {
_prevAssignmentMap.put(resourceName, prevAssignment.getRecord());
_prevAssignmentToUpdate.add(resourceName);
}
public void removePrevAssignment(String resourceName) {
_prevAssignmentMap.remove(resourceName);
_prevAssignmentToRemove.add(resourceName);
}
}