blob: 535359b3f6bbd756d5c83456920e64daf660b5d9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.syncope.core.provisioning.java.job;
import java.time.OffsetDateTime;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.syncope.common.keymaster.client.api.ConfParamOps;
import org.apache.syncope.common.lib.SyncopeConstants;
import org.apache.syncope.common.lib.types.IdRepoImplementationType;
import org.apache.syncope.common.lib.types.TaskType;
import org.apache.syncope.core.persistence.api.DomainHolder;
import org.apache.syncope.core.persistence.api.SyncopeCoreLoader;
import org.apache.syncope.core.persistence.api.dao.ImplementationDAO;
import org.apache.syncope.core.persistence.api.dao.JobStatusDAO;
import org.apache.syncope.core.persistence.api.dao.ReportDAO;
import org.apache.syncope.core.persistence.api.dao.TaskDAO;
import org.apache.syncope.core.persistence.api.entity.Implementation;
import org.apache.syncope.core.persistence.api.entity.Report;
import org.apache.syncope.core.persistence.api.entity.task.PullTask;
import org.apache.syncope.core.persistence.api.entity.task.PushTask;
import org.apache.syncope.core.persistence.api.entity.task.SchedTask;
import org.apache.syncope.core.persistence.api.entity.task.Task;
import org.apache.syncope.core.persistence.api.entity.task.TaskUtilsFactory;
import org.apache.syncope.core.provisioning.api.job.JobExecutionContext;
import org.apache.syncope.core.provisioning.api.job.JobManager;
import org.apache.syncope.core.provisioning.api.job.JobNamer;
import org.apache.syncope.core.provisioning.api.job.SchedTaskJobDelegate;
import org.apache.syncope.core.provisioning.java.job.notification.NotificationJob;
import org.apache.syncope.core.provisioning.java.job.report.ReportJob;
import org.apache.syncope.core.provisioning.java.pushpull.PullJobDelegate;
import org.apache.syncope.core.provisioning.java.pushpull.PushJobDelegate;
import org.apache.syncope.core.spring.ApplicationContextProvider;
import org.apache.syncope.core.spring.security.AuthContextUtils;
import org.apache.syncope.core.spring.security.SecurityProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.transaction.annotation.Transactional;
public class DefaultJobManager implements JobManager, SyncopeCoreLoader {
protected static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
protected final DomainHolder<?> domainHolder;
protected final SyncopeTaskScheduler scheduler;
protected final JobStatusDAO jobStatusDAO;
protected final TaskDAO taskDAO;
protected final ReportDAO reportDAO;
protected final ImplementationDAO implementationDAO;
protected final TaskUtilsFactory taskUtilsFactory;
protected final ConfParamOps confParamOps;
protected final SecurityProperties securityProperties;
public DefaultJobManager(
final DomainHolder<?> domainHolder,
final SyncopeTaskScheduler scheduler,
final JobStatusDAO jobStatusDAO,
final TaskDAO taskDAO,
final ReportDAO reportDAO,
final ImplementationDAO implementationDAO,
final TaskUtilsFactory taskUtilsFactory,
final ConfParamOps confParamOps,
final SecurityProperties securityProperties) {
this.domainHolder = domainHolder;
this.scheduler = scheduler;
this.jobStatusDAO = jobStatusDAO;
this.taskDAO = taskDAO;
this.reportDAO = reportDAO;
this.implementationDAO = implementationDAO;
this.taskUtilsFactory = taskUtilsFactory;
this.confParamOps = confParamOps;
this.securityProperties = securityProperties;
}
@Override
public boolean isRunning(final String jobName) {
boolean locked = jobStatusDAO.lock(jobName);
if (locked) {
jobStatusDAO.unlock(jobName);
}
return !locked;
}
protected void registerJob(
final JobExecutionContext context,
final Class<? extends Job> jobClass,
final String cronExpression,
final OffsetDateTime startAt) {
if (isRunning(context.getJobName())) {
LOG.debug("Job {} already running, cancel", context.getJobName());
return;
}
// 0. unregister job
unregisterJob(context.getJobName());
// 1. prepare job
Job job = ApplicationContextProvider.getBeanFactory().createBean(jobClass);
job.setContext(context);
// 2. schedule
if (cronExpression == null && startAt == null) {
scheduler.register(job);
} else {
if (cronExpression == null) {
scheduler.schedule(job, startAt.toInstant());
} else {
scheduler.schedule(job, new CronTrigger(cronExpression));
}
}
}
protected void register(
final String domain,
final SchedTask task,
final OffsetDateTime startAt,
final String executor,
final boolean dryRun,
final Map<String, Object> jobData) {
Implementation jobDelegate = task.getJobDelegate() == null
? task instanceof PullTask
? implementationDAO.findByType(IdRepoImplementationType.TASKJOB_DELEGATE).stream().
filter(impl -> PullJobDelegate.class.getName().equals(impl.getBody())).
findFirst().orElse(null)
: task instanceof PushTask
? implementationDAO.findByType(IdRepoImplementationType.TASKJOB_DELEGATE).stream().
filter(impl -> PushJobDelegate.class.getName().equals(impl.getBody())).
findFirst().orElse(null)
: null
: task.getJobDelegate();
if (jobDelegate == null) {
throw new IllegalArgumentException("Task " + task
+ " does not provide any " + SchedTaskJobDelegate.class.getSimpleName());
}
JobExecutionContext context = new JobExecutionContext(
domain,
JobNamer.getJobName(task),
executor,
dryRun);
context.getData().put(JobManager.TASK_TYPE, taskUtilsFactory.getInstance(task).getType());
context.getData().put(JobManager.TASK_KEY, task.getKey());
context.getData().put(JobManager.DELEGATE_IMPLEMENTATION, jobDelegate.getKey());
context.getData().putAll(jobData);
registerJob(
context,
TaskJob.class,
task.getCronExpression(),
startAt);
}
@Override
public void register(
final SchedTask task,
final OffsetDateTime startAt,
final String executor,
final boolean dryRun,
final Map<String, Object> jobData) {
register(AuthContextUtils.getDomain(), task, startAt, executor, dryRun, jobData);
}
protected void register(
final String domain,
final Report report,
final OffsetDateTime startAt,
final String executor,
final boolean dryRun) {
JobExecutionContext context = new JobExecutionContext(
domain,
JobNamer.getJobName(report),
executor,
dryRun);
context.getData().put(JobManager.REPORT_KEY, report.getKey());
context.getData().put(JobManager.DELEGATE_IMPLEMENTATION, report.getJobDelegate().getKey());
registerJob(
context,
ReportJob.class,
report.getCronExpression(),
startAt);
}
@Override
public void register(
final Report report,
final OffsetDateTime startAt,
final String executor,
final boolean dryRun) {
register(AuthContextUtils.getDomain(), report, startAt, executor, dryRun);
}
protected void unregisterJob(final String jobName) {
scheduler.cancel(AuthContextUtils.getDomain(), jobName);
scheduler.delete(AuthContextUtils.getDomain(), jobName);
}
@Override
public void unregister(final Task<?> task) {
unregisterJob(JobNamer.getJobName(task));
}
@Override
public void unregister(final Report report) {
unregisterJob(JobNamer.getJobName(report));
}
@Override
public int getOrder() {
return 500;
}
@Transactional
@Override
public void load(final String domain) {
String notificationJobCronExp = AuthContextUtils.callAsAdmin(SyncopeConstants.MASTER_DOMAIN, () -> {
String result = StringUtils.EMPTY;
String conf = confParamOps.get(
SyncopeConstants.MASTER_DOMAIN, "notificationjob.cronExpression", null, String.class);
if (conf == null) {
result = NotificationJob.DEFAULT_CRON_EXP;
} else if (!StringUtils.EMPTY.equals(conf)) {
result = conf;
}
return result;
});
AuthContextUtils.runAsAdmin(domain, () -> {
// 1. jobs for SchedTasks
Set<SchedTask> tasks = new HashSet<>(taskDAO.<SchedTask>findAll(TaskType.SCHEDULED));
tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PULL));
tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PUSH));
boolean loadException = false;
for (Iterator<SchedTask> it = tasks.iterator(); it.hasNext() && !loadException;) {
SchedTask task = it.next();
try {
register(domain, task, task.getStartAt(), securityProperties.getAdminUser(), false, Map.of());
} catch (Exception e) {
LOG.error("While loading job instance for task " + task.getKey(), e);
loadException = true;
}
}
if (loadException) {
LOG.debug("Errors while loading job instances for tasks, aborting");
} else {
// 2. jobs for Reports
for (Iterator<? extends Report> it = reportDAO.findAll().iterator(); it.hasNext() && !loadException;) {
Report report = it.next();
try {
register(domain, report, null, securityProperties.getAdminUser(), false);
} catch (Exception e) {
LOG.error("While loading job instance for report " + report.getName(), e);
loadException = true;
}
}
if (loadException) {
LOG.debug("Errors while loading job instances for reports, aborting");
}
}
});
if (SyncopeConstants.MASTER_DOMAIN.equals(domain)) {
// 3. NotificationJob
if (StringUtils.isBlank(notificationJobCronExp)) {
LOG.debug("Empty value provided for {}'s cron, not scheduling", NotificationJob.class.getSimpleName());
} else {
LOG.debug("{}'s cron expression: {} - scheduling",
NotificationJob.class.getSimpleName(), notificationJobCronExp);
JobExecutionContext context = new JobExecutionContext(
domain,
NOTIFICATION_JOB,
securityProperties.getAdminUser(),
false);
try {
registerJob(
context,
NotificationJob.class,
notificationJobCronExp,
null);
} catch (Exception e) {
LOG.error("While loading {} instance", NotificationJob.class.getSimpleName(), e);
}
}
// 4. SystemLoadReporterJob (fixed schedule, every minute)
LOG.debug("Registering {}", SystemLoadReporterJob.class);
JobExecutionContext context = new JobExecutionContext(
domain,
StringUtils.uncapitalize(SystemLoadReporterJob.class.getSimpleName()),
securityProperties.getAdminUser(),
false);
try {
registerJob(
context,
SystemLoadReporterJob.class,
"0 * * * * ?",
null);
} catch (Exception e) {
LOG.error("While loading {} instance", SystemLoadReporterJob.class.getSimpleName(), e);
}
}
}
@Override
public void unload(final String domain) {
AuthContextUtils.runAsAdmin(domain, () -> {
// 1. jobs for SchedTasks
Set<SchedTask> tasks = new HashSet<>(taskDAO.<SchedTask>findAll(TaskType.SCHEDULED));
tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PULL));
tasks.addAll(taskDAO.<SchedTask>findAll(TaskType.PUSH));
tasks.forEach(task -> {
try {
unregister(task);
} catch (Exception e) {
LOG.error("While unloading job instance for task " + task.getKey(), e);
}
});
// 2. jobs for Reports
reportDAO.findAll().forEach(report -> {
try {
unregister(report);
} catch (Exception e) {
LOG.error("While unloading job instance for report " + report.getName(), e);
}
});
});
}
}