blob: 89e1409f9a30d804e463e5878702ce578cca47be [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.scheduler.state.ready;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.context.JobContext;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.env.BootstrapEnvironment;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.running.RunningService;
import org.apache.shardingsphere.elasticjob.cloud.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Strings;
import com.google.common.collect.Collections2;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Ready service.
*/
@Slf4j
public final class ReadyService {
private final BootstrapEnvironment env = BootstrapEnvironment.getInstance();
private final CoordinatorRegistryCenter regCenter;
private final CloudJobConfigurationService configService;
private final RunningService runningService;
public ReadyService(final CoordinatorRegistryCenter regCenter) {
this.regCenter = regCenter;
configService = new CloudJobConfigurationService(regCenter);
runningService = new RunningService(regCenter);
}
/**
* Add transient job to ready queue.
*
* @param jobName job name
*/
public void addTransient(final String jobName) {
if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
log.warn("Cannot add transient job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
return;
}
Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
if (!cloudJobConfig.isPresent() || CloudJobExecutionType.TRANSIENT != cloudJobConfig.get().getJobExecutionType()) {
return;
}
String readyJobNode = ReadyNode.getReadyJobNodePath(jobName);
String times = regCenter.getDirectly(readyJobNode);
if (cloudJobConfig.get().getTypeConfig().getCoreConfig().isMisfire()) {
regCenter.persist(readyJobNode, Integer.toString(null == times ? 1 : Integer.parseInt(times) + 1));
} else {
regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
}
}
/**
* Add daemon job to ready queue.
*
* @param jobName job name
*/
public void addDaemon(final String jobName) {
if (regCenter.getNumChildren(ReadyNode.ROOT) > env.getFrameworkConfiguration().getJobStateQueueSize()) {
log.warn("Cannot add daemon job, caused by read state queue size is larger than {}.", env.getFrameworkConfiguration().getJobStateQueueSize());
return;
}
Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
if (!cloudJobConfig.isPresent() || CloudJobExecutionType.DAEMON != cloudJobConfig.get().getJobExecutionType() || runningService.isJobRunning(jobName)) {
return;
}
regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
}
/**
* Set misfire disabled.
*
* @param jobName job name
*/
public void setMisfireDisabled(final String jobName) {
Optional<CloudJobConfiguration> cloudJobConfig = configService.load(jobName);
if (cloudJobConfig.isPresent() && null != regCenter.getDirectly(ReadyNode.getReadyJobNodePath(jobName))) {
regCenter.persist(ReadyNode.getReadyJobNodePath(jobName), "1");
}
}
/**
* Get all the eligible job contexts from ready queue.
*
* @param ineligibleJobContexts ineligible job contexts
* @return collection of eligible contexts
*/
public Collection<JobContext> getAllEligibleJobContexts(final Collection<JobContext> ineligibleJobContexts) {
if (!regCenter.isExisted(ReadyNode.ROOT)) {
return Collections.emptyList();
}
Collection<String> ineligibleJobNames = Collections2.transform(ineligibleJobContexts, new Function<JobContext, String>() {
@Override
public String apply(final JobContext input) {
return input.getJobConfig().getJobName();
}
});
List<String> jobNames = regCenter.getChildrenKeys(ReadyNode.ROOT);
List<JobContext> result = new ArrayList<>(jobNames.size());
for (String each : jobNames) {
if (ineligibleJobNames.contains(each)) {
continue;
}
Optional<CloudJobConfiguration> jobConfig = configService.load(each);
if (!jobConfig.isPresent()) {
regCenter.remove(ReadyNode.getReadyJobNodePath(each));
continue;
}
if (!runningService.isJobRunning(each)) {
result.add(JobContext.from(jobConfig.get(), ExecutionType.READY));
}
}
return result;
}
/**
* Remove jobs from ready queue.
*
* @param jobNames collection of jobs to be removed
*/
public void remove(final Collection<String> jobNames) {
for (String each : jobNames) {
String readyJobNode = ReadyNode.getReadyJobNodePath(each);
String timesStr = regCenter.getDirectly(readyJobNode);
int times = null == timesStr ? 0 : Integer.parseInt(timesStr);
if (times <= 1) {
regCenter.remove(readyJobNode);
} else {
regCenter.persist(readyJobNode, Integer.toString(times - 1));
}
}
}
/**
* Get all ready tasks.
*
* @return all ready tasks
*/
public Map<String, Integer> getAllReadyTasks() {
if (!regCenter.isExisted(ReadyNode.ROOT)) {
return Collections.emptyMap();
}
List<String> jobNames = regCenter.getChildrenKeys(ReadyNode.ROOT);
Map<String, Integer> result = new HashMap<>(jobNames.size(), 1);
for (String each : jobNames) {
String times = regCenter.get(ReadyNode.getReadyJobNodePath(each));
if (!Strings.isNullOrEmpty(times)) {
result.put(each, Integer.parseInt(times));
}
}
return result;
}
}