blob: 6b197100d647243de75c9f4fe05cf130a7c18319 [file] [log] [blame]
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.elasticjob.cloud.scheduler.state.failover;
import io.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import io.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
import io.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
import io.elasticjob.cloud.scheduler.state.running.RunningService;
import io.elasticjob.cloud.context.ExecutionType;
import io.elasticjob.cloud.scheduler.context.JobContext;
import io.elasticjob.cloud.context.TaskContext;
import io.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.hash.HashCode;
import com.google.common.hash.Hashing;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* 失效转移队列服务.
*
* @author zhangliang
*/
@Slf4j
public final class FailoverService {
private final BootstrapEnvironment env = BootstrapEnvironment.getInstance();
private final CoordinatorRegistryCenter regCenter;
private final CloudJobConfigurationService configService;
private final RunningService runningService;
public FailoverService(final CoordinatorRegistryCenter regCenter) {
this.regCenter = regCenter;
configService = new CloudJobConfigurationService(regCenter);
runningService = new RunningService(regCenter);
}
/**
* 将任务放入失效转移队列.
*
* @param taskContext 任务运行时上下文
*/
public void add(final TaskContext taskContext) {
if (regCenter.getNumChildren(FailoverNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
log.warn("Cannot add job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
return;
}
String failoverTaskNodePath = FailoverNode.getFailoverTaskNodePath(taskContext.getMetaInfo().toString());
if (!regCenter.isExisted(failoverTaskNodePath) && !runningService.isTaskRunning(taskContext.getMetaInfo())) {
// TODO Daemon类型作业增加存储是否立即失效转移
regCenter.persist(failoverTaskNodePath, taskContext.getId());
}
}
/**
* 从失效转移队列中获取所有有资格执行的作业上下文.
*
* @return 有资格执行的作业上下文集合
*/
public Collection<JobContext> getAllEligibleJobContexts() {
if (!regCenter.isExisted(FailoverNode.ROOT)) {
return Collections.emptyList();
}
List<String> jobNames = regCenter.getChildrenKeys(FailoverNode.ROOT);
Collection<JobContext> result = new ArrayList<>(jobNames.size());
Set<HashCode> assignedTasks = new HashSet<>(jobNames.size() * 10, 1);
for (String each : jobNames) {
List<String> taskIdList = regCenter.getChildrenKeys(FailoverNode.getFailoverJobNodePath(each));
if (taskIdList.isEmpty()) {
regCenter.remove(FailoverNode.getFailoverJobNodePath(each));
continue;
}
Optional<CloudJobConfiguration> jobConfig = configService.load(each);
if (!jobConfig.isPresent()) {
regCenter.remove(FailoverNode.getFailoverJobNodePath(each));
continue;
}
List<Integer> assignedShardingItems = getAssignedShardingItems(each, taskIdList, assignedTasks);
if (!assignedShardingItems.isEmpty() && jobConfig.isPresent()) {
result.add(new JobContext(jobConfig.get(), assignedShardingItems, ExecutionType.FAILOVER));
}
}
return result;
}
private List<Integer> getAssignedShardingItems(final String jobName, final List<String> taskIdList, final Set<HashCode> assignedTasks) {
List<Integer> result = new ArrayList<>(taskIdList.size());
for (String each : taskIdList) {
TaskContext.MetaInfo metaInfo = TaskContext.MetaInfo.from(each);
if (assignedTasks.add(Hashing.md5().newHasher().putString(jobName, Charsets.UTF_8).putInt(metaInfo.getShardingItems().get(0)).hash()) && !runningService.isTaskRunning(metaInfo)) {
result.add(metaInfo.getShardingItems().get(0));
}
}
return result;
}
/**
* 从失效转移队列中删除相关任务.
*
* @param metaInfoList 待删除的任务元信息集合
*/
public void remove(final Collection<TaskContext.MetaInfo> metaInfoList) {
for (TaskContext.MetaInfo each : metaInfoList) {
regCenter.remove(FailoverNode.getFailoverTaskNodePath(each.toString()));
}
}
/**
* 从失效转移队列中查找任务.
*
* @param metaInfo 任务元信息
* @return 失效转移任务Id
*/
public Optional<String> getTaskId(final TaskContext.MetaInfo metaInfo) {
String failoverTaskNodePath = FailoverNode.getFailoverTaskNodePath(metaInfo.toString());
Optional<String> result = Optional.absent();
if (regCenter.isExisted(failoverTaskNodePath)) {
result = Optional.of(regCenter.get(failoverTaskNodePath));
}
return result;
}
/**
* 获取待失效转移的全部任务.
*
* @return 待失效转移的全部任务
*/
public Map<String, Collection<FailoverTaskInfo>> getAllFailoverTasks() {
if (!regCenter.isExisted(FailoverNode.ROOT)) {
return Collections.emptyMap();
}
List<String> jobNames = regCenter.getChildrenKeys(FailoverNode.ROOT);
Map<String, Collection<FailoverTaskInfo>> result = new HashMap<>(jobNames.size(), 1);
for (String each : jobNames) {
Collection<FailoverTaskInfo> failoverTasks = getFailoverTasks(each);
if (!failoverTasks.isEmpty()) {
result.put(each, failoverTasks);
}
}
return result;
}
/**
* 获取待失效转移的任务集合.
*
* @param jobName 作业名称
* @return 待失效转移的任务集合
*/
private Collection<FailoverTaskInfo> getFailoverTasks(final String jobName) {
List<String> failOverTasks = regCenter.getChildrenKeys(FailoverNode.getFailoverJobNodePath(jobName));
List<FailoverTaskInfo> result = new ArrayList<>(failOverTasks.size());
for (String each : failOverTasks) {
String originalTaskId = regCenter.get(FailoverNode.getFailoverTaskNodePath(each));
if (!Strings.isNullOrEmpty(originalTaskId)) {
result.add(new FailoverTaskInfo(TaskContext.MetaInfo.from(each), originalTaskId));
}
}
return result;
}
}