blob: 2858da3629da7ee6e8b02976235c383c9554505b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.shardingsphere.elasticjob.cloud.scheduler.mesos;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfigurationService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.app.DisableAppService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.disable.job.DisableJobService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.running.RunningService;
import org.apache.shardingsphere.elasticjob.cloud.context.ExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.app.CloudAppConfiguration;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobConfigurationService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.config.job.CloudJobExecutionType;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.context.JobContext;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.failover.FailoverService;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.failover.FailoverTaskInfo;
import org.apache.shardingsphere.elasticjob.cloud.scheduler.state.ready.ReadyService;
import org.apache.shardingsphere.elasticjob.cloud.context.TaskContext;
import org.apache.shardingsphere.elasticjob.cloud.reg.base.CoordinatorRegistryCenter;
import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.codehaus.jettison.json.JSONException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* Mesos facade service.
*/
@Slf4j
public final class FacadeService {
private final CloudAppConfigurationService appConfigService;
private final CloudJobConfigurationService jobConfigService;
private final ReadyService readyService;
private final RunningService runningService;
private final FailoverService failoverService;
private final DisableAppService disableAppService;
private final DisableJobService disableJobService;
private final MesosStateService mesosStateService;
public FacadeService(final CoordinatorRegistryCenter regCenter) {
appConfigService = new CloudAppConfigurationService(regCenter);
jobConfigService = new CloudJobConfigurationService(regCenter);
readyService = new ReadyService(regCenter);
runningService = new RunningService(regCenter);
failoverService = new FailoverService(regCenter);
disableAppService = new DisableAppService(regCenter);
disableJobService = new DisableJobService(regCenter);
mesosStateService = new MesosStateService(regCenter);
}
/**
* Start facade service.
*/
public void start() {
log.info("Elastic Job: Start facade service");
runningService.start();
}
/**
* Get eligible job.
*
* @return collection of eligible job context
*/
public Collection<JobContext> getEligibleJobContext() {
Collection<JobContext> failoverJobContexts = failoverService.getAllEligibleJobContexts();
Collection<JobContext> readyJobContexts = readyService.getAllEligibleJobContexts(failoverJobContexts);
Collection<JobContext> result = new ArrayList<>(failoverJobContexts.size() + readyJobContexts.size());
result.addAll(failoverJobContexts);
result.addAll(readyJobContexts);
return result;
}
/**
* Remove launched task from queue.
*
* @param taskContexts task running contexts
*/
public void removeLaunchTasksFromQueue(final List<TaskContext> taskContexts) {
List<TaskContext> failoverTaskContexts = new ArrayList<>(taskContexts.size());
Collection<String> readyJobNames = new HashSet<>(taskContexts.size(), 1);
for (TaskContext each : taskContexts) {
switch (each.getType()) {
case FAILOVER:
failoverTaskContexts.add(each);
break;
case READY:
readyJobNames.add(each.getMetaInfo().getJobName());
break;
default:
break;
}
}
failoverService.remove(Lists.transform(failoverTaskContexts, new Function<TaskContext, TaskContext.MetaInfo>() {
@Override
public TaskContext.MetaInfo apply(final TaskContext input) {
return input.getMetaInfo();
}
}));
readyService.remove(readyJobNames);
}
/**
* Add task to running queue.
*
* @param taskContext task running context
*/
public void addRunning(final TaskContext taskContext) {
runningService.add(taskContext);
}
/**
* Update daemon task status.
*
* @param taskContext task running context
* @param isIdle set to idle or not
*/
public void updateDaemonStatus(final TaskContext taskContext, final boolean isIdle) {
runningService.updateIdle(taskContext, isIdle);
}
/**
* Remove task from running queue.
*
* @param taskContext task running context
*/
public void removeRunning(final TaskContext taskContext) {
runningService.remove(taskContext);
}
/**
* Record task to failover queue.
*
* @param taskContext task running context
*/
public void recordFailoverTask(final TaskContext taskContext) {
Optional<CloudJobConfiguration> jobConfigOptional = jobConfigService.load(taskContext.getMetaInfo().getJobName());
if (!jobConfigOptional.isPresent()) {
return;
}
if (isDisable(jobConfigOptional.get())) {
return;
}
CloudJobConfiguration jobConfig = jobConfigOptional.get();
if (jobConfig.getTypeConfig().getCoreConfig().isFailover() || CloudJobExecutionType.DAEMON == jobConfig.getJobExecutionType()) {
failoverService.add(taskContext);
}
}
private boolean isDisable(final CloudJobConfiguration jobConfiguration) {
return disableAppService.isDisabled(jobConfiguration.getAppName()) || disableJobService.isDisabled(jobConfiguration.getJobName());
}
/**
* Add transient job to ready queue.
*
* @param jobName job name
*/
public void addTransient(final String jobName) {
readyService.addTransient(jobName);
}
/**
* Load cloud job config.
*
* @param jobName job name
* @return cloud job config
*/
public Optional<CloudJobConfiguration> load(final String jobName) {
return jobConfigService.load(jobName);
}
/**
* Load app config by app name.
*
* @param appName app name
* @return cloud app config
*/
public Optional<CloudAppConfiguration> loadAppConfig(final String appName) {
return appConfigService.load(appName);
}
/**
* Get failover task id by task meta info.
*
* @param metaInfo task meta info
* @return failover task id
*/
public Optional<String> getFailoverTaskId(final TaskContext.MetaInfo metaInfo) {
return failoverService.getTaskId(metaInfo);
}
/**
* Add daemon job to ready queue.
*
* @param jobName job name
*/
public void addDaemonJobToReadyQueue(final String jobName) {
Optional<CloudJobConfiguration> jobConfigOptional = jobConfigService.load(jobName);
if (!jobConfigOptional.isPresent()) {
return;
}
if (isDisable(jobConfigOptional.get())) {
return;
}
readyService.addDaemon(jobName);
}
/**
* Determine whether the task is running or not.
*
* @param taskContext task running context
* @return true is running, otherwise not
*/
public boolean isRunning(final TaskContext taskContext) {
return ExecutionType.FAILOVER != taskContext.getType() && !runningService.getRunningTasks(taskContext.getMetaInfo().getJobName()).isEmpty()
|| ExecutionType.FAILOVER == taskContext.getType() && runningService.isTaskRunning(taskContext.getMetaInfo());
}
/**
* Add mapping of the task primary key and host name.
*
* @param taskId task primary key
* @param hostname host name
*/
public void addMapping(final String taskId, final String hostname) {
runningService.addMapping(taskId, hostname);
}
/**
* Retrieve hostname and then remove task.
*
* @param taskId task primary key
* @return hostname of the removed task
*/
public String popMapping(final String taskId) {
return runningService.popMapping(taskId);
}
/**
* Get all ready tasks.
*
* @return ready tasks
*/
public Map<String, Integer> getAllReadyTasks() {
return readyService.getAllReadyTasks();
}
/**
* Get all running tasks.
*
* @return running tasks
*/
public Map<String, Set<TaskContext>> getAllRunningTasks() {
return runningService.getAllRunningTasks();
}
/**
* Get all failover tasks.
*
* @return failover tasks
*/
public Map<String, Collection<FailoverTaskInfo>> getAllFailoverTasks() {
return failoverService.getAllFailoverTasks();
}
/**
* Determine whether the job is disable or not.
*
* @param jobName job name
* @return true is disabled, otherwise not
*/
public boolean isJobDisabled(final String jobName) {
Optional<CloudJobConfiguration> jobConfiguration = jobConfigService.load(jobName);
return !jobConfiguration.isPresent() || disableAppService.isDisabled(jobConfiguration.get().getAppName()) || disableJobService.isDisabled(jobName);
}
/**
* Enable job.
*
* @param jobName job name
*/
public void enableJob(final String jobName) {
disableJobService.remove(jobName);
}
/**
* Disable job.
*
* @param jobName job name
*/
public void disableJob(final String jobName) {
disableJobService.add(jobName);
}
/**
* Get all running executor info.
*
* @return collection of executor info
* @throws JSONException json exception
*/
public Collection<MesosStateService.ExecutorStateInfo> loadExecutorInfo() throws JSONException {
return mesosStateService.executors();
}
/**
* Stop facade service.
*/
public void stop() {
log.info("Elastic Job: Stop facade service");
// TODO stop scheduler
runningService.clear();
}
}