blob: 30ab6ff5a3e7406a80819f46c42fea7490f62794 [file] [log] [blame]
/*
* Copyright 1999-2015 dangdang.com.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/
package io.elasticjob.cloud.executor;
import io.elasticjob.cloud.api.ElasticJob;
import io.elasticjob.cloud.executor.JobExecutorFactory;
import io.elasticjob.cloud.executor.JobFacade;
import io.elasticjob.cloud.executor.ShardingContexts;
import io.elasticjob.cloud.config.JobRootConfiguration;
import io.elasticjob.cloud.exception.JobSystemException;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
import org.apache.mesos.ExecutorDriver;
import org.apache.mesos.Protos;
import org.quartz.CronScheduleBuilder;
import org.quartz.CronTrigger;
import org.quartz.Job;
import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;
import org.quartz.plugins.management.ShutdownHookPlugin;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* 常驻作业调度器.
*
* @author zhangliang
* @author caohao
*/
@RequiredArgsConstructor
public final class DaemonTaskScheduler {
public static final String ELASTIC_JOB_DATA_MAP_KEY = "elasticJob";
private static final String JOB_FACADE_DATA_MAP_KEY = "jobFacade";
private static final String EXECUTOR_DRIVER_DATA_MAP_KEY = "executorDriver";
private static final String TASK_ID_DATA_MAP_KEY = "taskId";
private static final ConcurrentHashMap<String, Scheduler> RUNNING_SCHEDULERS = new ConcurrentHashMap<>(1024, 1);
private final ElasticJob elasticJob;
private final JobRootConfiguration jobRootConfig;
private final JobFacade jobFacade;
private final ExecutorDriver executorDriver;
private final Protos.TaskID taskId;
/**
* 初始化作业.
*/
public void init() {
JobDetail jobDetail = JobBuilder.newJob(DaemonJob.class).withIdentity(jobRootConfig.getTypeConfig().getCoreConfig().getJobName()).build();
jobDetail.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJob);
jobDetail.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
jobDetail.getJobDataMap().put(EXECUTOR_DRIVER_DATA_MAP_KEY, executorDriver);
jobDetail.getJobDataMap().put(TASK_ID_DATA_MAP_KEY, taskId);
try {
scheduleJob(initializeScheduler(), jobDetail, taskId.getValue(), jobRootConfig.getTypeConfig().getCoreConfig().getCron());
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
private Scheduler initializeScheduler() throws SchedulerException {
StdSchedulerFactory factory = new StdSchedulerFactory();
factory.initialize(getBaseQuartzProperties());
return factory.getScheduler();
}
private Properties getBaseQuartzProperties() {
Properties result = new Properties();
result.put("org.quartz.threadPool.class", org.quartz.simpl.SimpleThreadPool.class.getName());
result.put("org.quartz.threadPool.threadCount", "1");
result.put("org.quartz.scheduler.instanceName", taskId.getValue());
if (!jobRootConfig.getTypeConfig().getCoreConfig().isMisfire()) {
result.put("org.quartz.jobStore.misfireThreshold", "1");
}
result.put("org.quartz.plugin.shutdownhook.class", ShutdownHookPlugin.class.getName());
result.put("org.quartz.plugin.shutdownhook.cleanShutdown", Boolean.TRUE.toString());
return result;
}
private void scheduleJob(final Scheduler scheduler, final JobDetail jobDetail, final String triggerIdentity, final String cron) {
try {
if (!scheduler.checkExists(jobDetail.getKey())) {
scheduler.scheduleJob(jobDetail, createTrigger(triggerIdentity, cron));
}
scheduler.start();
RUNNING_SCHEDULERS.putIfAbsent(scheduler.getSchedulerName(), scheduler);
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
private CronTrigger createTrigger(final String triggerIdentity, final String cron) {
return TriggerBuilder.newTrigger().withIdentity(triggerIdentity).withSchedule(CronScheduleBuilder.cronSchedule(cron).withMisfireHandlingInstructionDoNothing()).build();
}
/**
* 停止任务调度.
*
* @param taskID 任务主键
*/
public static void shutdown(final Protos.TaskID taskID) {
Scheduler scheduler = RUNNING_SCHEDULERS.remove(taskID.getValue());
if (null != scheduler) {
try {
scheduler.shutdown();
} catch (final SchedulerException ex) {
throw new JobSystemException(ex);
}
}
}
/**
* 常驻作业.
*
* @author zhangliang
*/
public static final class DaemonJob implements Job {
@Setter
private ElasticJob elasticJob;
@Setter
private JobFacade jobFacade;
@Setter
private ExecutorDriver executorDriver;
@Setter
private Protos.TaskID taskId;
@Override
public void execute(final JobExecutionContext context) throws JobExecutionException {
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
int jobEventSamplingCount = shardingContexts.getJobEventSamplingCount();
int currentJobEventSamplingCount = shardingContexts.getCurrentJobEventSamplingCount();
if (jobEventSamplingCount > 0 && ++currentJobEventSamplingCount < jobEventSamplingCount) {
shardingContexts.setCurrentJobEventSamplingCount(currentJobEventSamplingCount);
jobFacade.getShardingContexts().setAllowSendJobEvent(false);
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
} else {
jobFacade.getShardingContexts().setAllowSendJobEvent(true);
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("BEGIN").build());
JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
executorDriver.sendStatusUpdate(Protos.TaskStatus.newBuilder().setTaskId(taskId).setState(Protos.TaskState.TASK_RUNNING).setMessage("COMPLETE").build());
shardingContexts.setCurrentJobEventSamplingCount(0);
}
}
}
}