blob: f41dac7259293268055cb9abc90a97bd33703870 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.lite.internal.sharding;
import com.google.common.base.Strings;
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.infra.handler.sharding.JobInstance;
import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts;
import org.apache.shardingsphere.elasticjob.lite.internal.config.ConfigurationService;
import org.apache.shardingsphere.elasticjob.lite.internal.schedule.JobRegistry;
import org.apache.shardingsphere.elasticjob.lite.internal.storage.JobNodeStorage;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Execution service.
*/
public final class ExecutionService {
private final String jobName;
private final JobNodeStorage jobNodeStorage;
private final ConfigurationService configService;
public ExecutionService(final CoordinatorRegistryCenter regCenter, final String jobName) {
this.jobName = jobName;
jobNodeStorage = new JobNodeStorage(regCenter, jobName);
configService = new ConfigurationService(regCenter, jobName);
}
/**
* Register job begin.
*
* @param shardingContexts sharding contexts
*/
public void registerJobBegin(final ShardingContexts shardingContexts) {
JobRegistry.getInstance().setJobRunning(jobName, true);
JobConfiguration jobConfiguration = configService.load(true);
if (!jobConfiguration.isMonitorExecution()) {
return;
}
String jobInstanceId = JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId();
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
if (jobConfiguration.isFailover()) {
jobNodeStorage.fillJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
} else {
jobNodeStorage.fillEphemeralJobNode(ShardingNode.getRunningNode(each), jobInstanceId);
}
}
}
/**
* Register job completed.
*
* @param shardingContexts sharding contexts
*/
public void registerJobCompleted(final ShardingContexts shardingContexts) {
JobRegistry.getInstance().setJobRunning(jobName, false);
if (!configService.load(true).isMonitorExecution()) {
return;
}
for (int each : shardingContexts.getShardingItemParameters().keySet()) {
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each));
}
}
/**
* Clear all running info.
*/
public void clearAllRunningInfo() {
clearRunningInfo(getAllItems());
}
/**
* Clear running info.
*
* @param items sharding items which need to be cleared
*/
public void clearRunningInfo(final List<Integer> items) {
for (int each : items) {
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getRunningNode(each));
}
}
/**
* Judge has running items or not.
*
* @param items sharding items need to be judged
* @return has running items or not
*/
public boolean hasRunningItems(final Collection<Integer> items) {
JobConfiguration jobConfig = configService.load(true);
if (!jobConfig.isMonitorExecution()) {
return false;
}
for (int each : items) {
if (jobNodeStorage.isJobNodeExisted(ShardingNode.getRunningNode(each))) {
return true;
}
}
return false;
}
/**
* Judge has running items or not.
*
* @return has running items or not
*/
public boolean hasRunningItems() {
return hasRunningItems(getAllItems());
}
private List<Integer> getAllItems() {
int shardingTotalCount = configService.load(true).getShardingTotalCount();
List<Integer> result = new ArrayList<>(shardingTotalCount);
for (int i = 0; i < shardingTotalCount; i++) {
result.add(i);
}
return result;
}
/**
* Get all running items with instance.
*
* @return running items with instance.
*/
public Map<Integer, JobInstance> getAllRunningItems() {
int shardingTotalCount = configService.load(true).getShardingTotalCount();
Map<Integer, JobInstance> result = new LinkedHashMap<>(shardingTotalCount, 1);
for (int i = 0; i < shardingTotalCount; i++) {
String data = jobNodeStorage.getJobNodeData(ShardingNode.getRunningNode(i));
if (!Strings.isNullOrEmpty(data)) {
result.put(i, new JobInstance(data));
}
}
return result;
}
/**
* Set misfire flag if sharding items still running.
*
* @param items sharding items need to be set misfire flag
* @return is misfired for this schedule time or not
*/
public boolean misfireIfHasRunningItems(final Collection<Integer> items) {
if (!hasRunningItems(items)) {
return false;
}
setMisfire(items);
return true;
}
/**
* Set misfire flag if sharding items still running.
*
* @param items sharding items need to be set misfire flag
*/
public void setMisfire(final Collection<Integer> items) {
for (int each : items) {
jobNodeStorage.createJobNodeIfNeeded(ShardingNode.getMisfireNode(each));
}
}
/**
* Get misfired job sharding items.
*
* @param items sharding items need to be judged
* @return misfired job sharding items
*/
public List<Integer> getMisfiredJobItems(final Collection<Integer> items) {
List<Integer> result = new ArrayList<>(items.size());
for (int each : items) {
if (jobNodeStorage.isJobNodeExisted(ShardingNode.getMisfireNode(each))) {
result.add(each);
}
}
return result;
}
/**
* Clear misfire flag.
*
* @param items sharding items need to be cleared
*/
public void clearMisfire(final Collection<Integer> items) {
for (int each : items) {
jobNodeStorage.removeJobNodeIfExisted(ShardingNode.getMisfireNode(each));
}
}
/**
* Get disabled sharding items.
*
* @param items sharding items need to be got
* @return disabled sharding items
*/
public List<Integer> getDisabledItems(final List<Integer> items) {
List<Integer> result = new ArrayList<>(items.size());
for (int each : items) {
if (jobNodeStorage.isJobNodeExisted(ShardingNode.getDisabledNode(each))) {
result.add(each);
}
}
return result;
}
}