blob: 89fe38edb08deafc01772352ff4cc1c94cefb133 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.fineract.infrastructure.jobs.service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.TimeZone;
import javax.annotation.PostConstruct;
import org.apache.fineract.infrastructure.core.domain.FineractPlatformTenant;
import org.apache.fineract.infrastructure.core.exception.PlatformInternalServerException;
import org.apache.fineract.infrastructure.core.service.ThreadLocalContextUtil;
import org.apache.fineract.infrastructure.jobs.annotation.CronMethodParser;
import org.apache.fineract.infrastructure.jobs.annotation.CronMethodParser.ClassMethodNamesPair;
import org.apache.fineract.infrastructure.jobs.domain.ScheduledJobDetail;
import org.apache.fineract.infrastructure.jobs.domain.SchedulerDetail;
import org.apache.fineract.infrastructure.jobs.exception.JobNotFoundException;
import org.apache.fineract.infrastructure.security.service.TenantDetailsService;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobKey;
import org.quartz.JobListener;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.scheduling.quartz.CronTriggerFactoryBean;
import org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
/**
* Service class to create and load batch jobs to Scheduler using
* {@link SchedulerFactoryBean} ,{@link MethodInvokingJobDetailFactoryBean} and
* {@link CronTriggerFactoryBean}
*/
@Service
public class JobRegisterServiceImpl implements JobRegisterService, ApplicationListener<ContextClosedEvent> {
private final static Logger logger = LoggerFactory.getLogger(JobRegisterServiceImpl.class);
// MIFOSX-1184: This class cannot use constructor injection, because one of
// its dependencies (SchedulerStopListener) has a circular dependency to
// itself. So, slightly differently from how it's done elsewhere in this
// code base, the following fields are not final, and there is no
// constructor, but setters.
private ApplicationContext applicationContext;
private SchedularWritePlatformService schedularWritePlatformService;
private TenantDetailsService tenantDetailsService;
private SchedulerJobListener schedulerJobListener;
private SchedulerStopListener schedulerStopListener;
private SchedulerTriggerListener globalSchedulerTriggerListener;
private final HashMap<String, Scheduler> schedulers = new HashMap<>(4);
@Autowired
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = applicationContext;
}
@Autowired
public void setSchedularWritePlatformService(SchedularWritePlatformService schedularWritePlatformService) {
this.schedularWritePlatformService = schedularWritePlatformService;
}
@Autowired
public void setTenantDetailsService(TenantDetailsService tenantDetailsService) {
this.tenantDetailsService = tenantDetailsService;
}
@Autowired
public void setSchedulerJobListener(SchedulerJobListener schedulerJobListener) {
this.schedulerJobListener = schedulerJobListener;
}
@Autowired
public void setSchedulerStopListener(SchedulerStopListener schedulerStopListener) {
this.schedulerStopListener = schedulerStopListener;
}
@Autowired
public void setGlobalTriggerListener(SchedulerTriggerListener globalTriggerListener) {
this.globalSchedulerTriggerListener = globalTriggerListener;
}
@PostConstruct
public void loadAllJobs() {
final List<FineractPlatformTenant> allTenants = this.tenantDetailsService.findAllTenants();
for (final FineractPlatformTenant tenant : allTenants) {
ThreadLocalContextUtil.setTenant(tenant);
final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs();
for (final ScheduledJobDetail jobDetails : scheduledJobDetails) {
scheduleJob(jobDetails);
jobDetails.updateTriggerMisfired(false);
this.schedularWritePlatformService.saveOrUpdate(jobDetails);
}
final SchedulerDetail schedulerDetail = this.schedularWritePlatformService.retriveSchedulerDetail();
if (schedulerDetail.isResetSchedulerOnBootup()) {
schedulerDetail.updateSuspendedState(false);
this.schedularWritePlatformService.updateSchedulerDetail(schedulerDetail);
}
}
}
public void executeJob(final ScheduledJobDetail scheduledJobDetail, String triggerType) {
try {
final JobDataMap jobDataMap = new JobDataMap();
if (triggerType == null) {
triggerType = SchedulerServiceConstants.TRIGGER_TYPE_APPLICATION;
}
jobDataMap.put(SchedulerServiceConstants.TRIGGER_TYPE_REFERENCE, triggerType);
jobDataMap.put(SchedulerServiceConstants.TENANT_IDENTIFIER, ThreadLocalContextUtil.getTenant().getTenantIdentifier());
final String key = scheduledJobDetail.getJobKey();
final JobKey jobKey = constructJobKey(key);
final String schedulerName = getSchedulerName(scheduledJobDetail);
final Scheduler scheduler = this.schedulers.get(schedulerName);
if (scheduler == null || !scheduler.checkExists(jobKey)) {
final JobDetail jobDetail = createJobDetail(scheduledJobDetail);
final String tempSchedulerName = "temp" + scheduledJobDetail.getId();
final Scheduler tempScheduler = createScheduler(tempSchedulerName, 1, schedulerJobListener, schedulerStopListener);
tempScheduler.addJob(jobDetail, true);
jobDataMap.put(SchedulerServiceConstants.SCHEDULER_NAME, tempSchedulerName);
this.schedulers.put(tempSchedulerName, tempScheduler);
tempScheduler.triggerJob(jobDetail.getKey(), jobDataMap);
} else {
scheduler.triggerJob(jobKey, jobDataMap);
}
} catch (final Exception e) {
final String msg = "Job execution failed for job with id:" + scheduledJobDetail.getId();
logger.error(msg, e);
throw new PlatformInternalServerException("error.msg.sheduler.job.execution.failed", msg, scheduledJobDetail.getId());
}
}
public void rescheduleJob(final ScheduledJobDetail scheduledJobDetail) {
try {
final String jobIdentity = scheduledJobDetail.getJobKey();
final JobKey jobKey = constructJobKey(jobIdentity);
final String schedulername = getSchedulerName(scheduledJobDetail);
final Scheduler scheduler = this.schedulers.get(schedulername);
if (scheduler != null) {
scheduler.deleteJob(jobKey);
}
scheduleJob(scheduledJobDetail);
this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail);
} catch (final Throwable throwable) {
final String stackTrace = getStackTraceAsString(throwable);
scheduledJobDetail.updateErrorLog(stackTrace);
this.schedularWritePlatformService.saveOrUpdate(scheduledJobDetail);
}
}
@Override
public void pauseScheduler() {
final SchedulerDetail schedulerDetail = this.schedularWritePlatformService.retriveSchedulerDetail();
if (!schedulerDetail.isSuspended()) {
schedulerDetail.updateSuspendedState(true);
this.schedularWritePlatformService.updateSchedulerDetail(schedulerDetail);
}
}
@Override
public void startScheduler() {
final SchedulerDetail schedulerDetail = this.schedularWritePlatformService.retriveSchedulerDetail();
if (schedulerDetail.isSuspended()) {
schedulerDetail.updateSuspendedState(false);
this.schedularWritePlatformService.updateSchedulerDetail(schedulerDetail);
if (schedulerDetail.isExecuteInstructionForMisfiredJobs()) {
final List<ScheduledJobDetail> scheduledJobDetails = this.schedularWritePlatformService.retrieveAllJobs();
for (final ScheduledJobDetail jobDetail : scheduledJobDetails) {
if (jobDetail.isTriggerMisfired()) {
if (jobDetail.isActiveSchedular()) {
executeJob(jobDetail, SchedulerServiceConstants.TRIGGER_TYPE_CRON);
}
final String schedulerName = getSchedulerName(jobDetail);
final Scheduler scheduler = this.schedulers.get(schedulerName);
if (scheduler != null) {
final String key = jobDetail.getJobKey();
final JobKey jobKey = constructJobKey(key);
try {
final List<? extends Trigger> triggers = scheduler.getTriggersOfJob(jobKey);
for (final Trigger trigger : triggers) {
if (trigger.getNextFireTime() != null && trigger.getNextFireTime().after(jobDetail.getNextRunTime())) {
jobDetail.updateNextRunTime(trigger.getNextFireTime());
}
}
} catch (final SchedulerException e) {
logger.error(e.getMessage(), e);
}
}
jobDetail.updateTriggerMisfired(false);
this.schedularWritePlatformService.saveOrUpdate(jobDetail);
}
}
}
}
}
@Override
public void rescheduleJob(final Long jobId) {
final ScheduledJobDetail scheduledJobDetail = this.schedularWritePlatformService.findByJobId(jobId);
rescheduleJob(scheduledJobDetail);
}
@Override
public void executeJob(final Long jobId) {
final ScheduledJobDetail scheduledJobDetail = this.schedularWritePlatformService.findByJobId(jobId);
if (scheduledJobDetail == null) { throw new JobNotFoundException(String.valueOf(jobId)); }
executeJob(scheduledJobDetail, null);
}
@Override
public boolean isSchedulerRunning() {
return !this.schedularWritePlatformService.retriveSchedulerDetail().isSuspended();
}
/**
* Need to use ContextClosedEvent instead of ContextStoppedEvent because in
* case Spring Boot fails to start-up (e.g. because Tomcat port is already
* in use) then org.springframework.boot.SpringApplication.run(String...)
* does a context.close(); and not a context.stop();
*/
@Override
public void onApplicationEvent(@SuppressWarnings("unused") ContextClosedEvent event) {
this.stopAllSchedulers();
}
private void scheduleJob(final ScheduledJobDetail scheduledJobDetails) {
if (!scheduledJobDetails.isActiveSchedular()) {
scheduledJobDetails.updateNextRunTime(null);
scheduledJobDetails.updateCurrentlyRunningStatus(false);
return;
}
try {
final JobDetail jobDetail = createJobDetail(scheduledJobDetails);
final Trigger trigger = createTrigger(scheduledJobDetails, jobDetail);
final Scheduler scheduler = getScheduler(scheduledJobDetails);
scheduler.scheduleJob(jobDetail, trigger);
scheduledJobDetails.updateJobKey(getJobKeyAsString(jobDetail.getKey()));
scheduledJobDetails.updateNextRunTime(trigger.getNextFireTime());
scheduledJobDetails.updateErrorLog(null);
} catch (final Throwable throwable) {
scheduledJobDetails.updateNextRunTime(null);
final String stackTrace = getStackTraceAsString(throwable);
scheduledJobDetails.updateErrorLog(stackTrace);
logger.error("Could not schedule job: " + scheduledJobDetails.getJobName(), throwable);
}
scheduledJobDetails.updateCurrentlyRunningStatus(false);
}
@Override
public void stopAllSchedulers() {
for (Scheduler scheduler : this.schedulers.values()) {
try {
scheduler.shutdown();
} catch (final SchedulerException e) {
logger.error(e.getMessage(), e);
}
}
}
private Scheduler getScheduler(final ScheduledJobDetail scheduledJobDetail) throws Exception {
final String schedulername = getSchedulerName(scheduledJobDetail);
Scheduler scheduler = this.schedulers.get(schedulername);
if (scheduler == null) {
int noOfThreads = SchedulerServiceConstants.DEFAULT_THREAD_COUNT;
if (scheduledJobDetail.getSchedulerGroup() > 0) {
noOfThreads = SchedulerServiceConstants.GROUP_THREAD_COUNT;
}
scheduler = createScheduler(schedulername, noOfThreads, schedulerJobListener);
this.schedulers.put(schedulername, scheduler);
}
return scheduler;
}
@Override
public void stopScheduler(final String name) {
final Scheduler scheduler = this.schedulers.remove(name);
try {
scheduler.shutdown();
} catch (final SchedulerException e) {
logger.error(e.getMessage(), e);
}
}
private String getSchedulerName(final ScheduledJobDetail scheduledJobDetail) {
final StringBuilder sb = new StringBuilder(20);
final FineractPlatformTenant tenant = ThreadLocalContextUtil.getTenant();
sb.append(SchedulerServiceConstants.SCHEDULER).append(tenant.getId());
if (scheduledJobDetail.getSchedulerGroup() > 0) {
sb.append(SchedulerServiceConstants.SCHEDULER_GROUP).append(scheduledJobDetail.getSchedulerGroup());
}
return sb.toString();
}
private Scheduler createScheduler(final String name, final int noOfThreads, JobListener... jobListeners) throws Exception {
final SchedulerFactoryBean schedulerFactoryBean = new SchedulerFactoryBean();
schedulerFactoryBean.setSchedulerName(name);
schedulerFactoryBean.setGlobalJobListeners(jobListeners);
final TriggerListener[] globalTriggerListeners = { globalSchedulerTriggerListener };
schedulerFactoryBean.setGlobalTriggerListeners(globalTriggerListeners);
final Properties quartzProperties = new Properties();
quartzProperties.put(SchedulerFactoryBean.PROP_THREAD_COUNT, Integer.toString(noOfThreads));
schedulerFactoryBean.setQuartzProperties(quartzProperties);
schedulerFactoryBean.afterPropertiesSet();
schedulerFactoryBean.start();
return schedulerFactoryBean.getScheduler();
}
private JobDetail createJobDetail(final ScheduledJobDetail scheduledJobDetail) throws Exception {
final FineractPlatformTenant tenant = ThreadLocalContextUtil.getTenant();
final ClassMethodNamesPair jobDetails = CronMethodParser.findTargetMethodDetails(scheduledJobDetail.getJobName());
if (jobDetails == null) { throw new IllegalArgumentException(
"Code has no @CronTarget with this job name (@see JobName); seems like DB/code are not in line: "
+ scheduledJobDetail.getJobName()); }
final Object targetObject = getBeanObject(Class.forName(jobDetails.className));
final MethodInvokingJobDetailFactoryBean jobDetailFactoryBean = new MethodInvokingJobDetailFactoryBean();
jobDetailFactoryBean.setName(scheduledJobDetail.getJobName() + "JobDetail" + tenant.getId());
jobDetailFactoryBean.setTargetObject(targetObject);
jobDetailFactoryBean.setTargetMethod(jobDetails.methodName);
jobDetailFactoryBean.setGroup(scheduledJobDetail.getGroupName());
jobDetailFactoryBean.setConcurrent(false);
jobDetailFactoryBean.afterPropertiesSet();
return jobDetailFactoryBean.getObject();
}
private Object getBeanObject(final Class<?> classType) throws ClassNotFoundException {
final List<Class<?>> typesList = new ArrayList<>();
final Class<?>[] interfaceType = classType.getInterfaces();
if (interfaceType.length > 0) {
typesList.addAll(Arrays.asList(interfaceType));
} else {
Class<?> superclassType = classType;
while (!Object.class.getName().equals(superclassType.getSuperclass().getName())) {
superclassType = superclassType.getSuperclass();
}
typesList.add(superclassType);
}
final List<String> beanNames = new ArrayList<>();
for (final Class<?> clazz : typesList) {
beanNames.addAll(Arrays.asList(this.applicationContext.getBeanNamesForType(clazz)));
}
Object targetObject = null;
for (final String beanName : beanNames) {
final Object nextObject = this.applicationContext.getBean(beanName);
String targetObjName = nextObject.toString();
targetObjName = targetObjName.substring(0, targetObjName.lastIndexOf("@"));
if (classType.getName().equals(targetObjName)) {
targetObject = nextObject;
break;
}
}
return targetObject;
}
private Trigger createTrigger(final ScheduledJobDetail scheduledJobDetails, final JobDetail jobDetail) {
final FineractPlatformTenant tenant = ThreadLocalContextUtil.getTenant();
final CronTriggerFactoryBean cronTriggerFactoryBean = new CronTriggerFactoryBean();
cronTriggerFactoryBean.setName(scheduledJobDetails.getJobName() + "Trigger" + tenant.getId());
cronTriggerFactoryBean.setJobDetail(jobDetail);
final JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put(SchedulerServiceConstants.TENANT_IDENTIFIER, tenant.getTenantIdentifier());
cronTriggerFactoryBean.setJobDataMap(jobDataMap);
final TimeZone timeZone = TimeZone.getTimeZone(tenant.getTimezoneId());
cronTriggerFactoryBean.setTimeZone(timeZone);
cronTriggerFactoryBean.setGroup(scheduledJobDetails.getGroupName());
cronTriggerFactoryBean.setCronExpression(scheduledJobDetails.getCronExpression());
cronTriggerFactoryBean.setPriority(scheduledJobDetails.getTaskPriority());
cronTriggerFactoryBean.afterPropertiesSet();
return cronTriggerFactoryBean.getObject();
}
private String getStackTraceAsString(final Throwable throwable) {
final StackTraceElement[] stackTraceElements = throwable.getStackTrace();
final StringBuffer sb = new StringBuffer(throwable.toString());
for (final StackTraceElement element : stackTraceElements) {
sb.append("\n \t at ").append(element.getClassName()).append(".").append(element.getMethodName()).append("(")
.append(element.getLineNumber()).append(")");
}
return sb.toString();
}
private String getJobKeyAsString(final JobKey jobKey) {
return jobKey.getName() + SchedulerServiceConstants.JOB_KEY_SEPERATOR + jobKey.getGroup();
}
private JobKey constructJobKey(final String Key) {
final String[] keyParams = Key.split(SchedulerServiceConstants.JOB_KEY_SEPERATOR);
final JobKey JobKey = new JobKey(keyParams[0], keyParams[1]);
return JobKey;
}
}