blob: b46ea8b817023dd367aec909613cb57703d1b8f0 [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
*/
package org.apache.griffin.core.job;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CONNECTOR_NAME;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_CRON_EXPRESSION;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.INVALID_JOB_NAME;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_IS_NOT_IN_PAUSED_STATUS;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_IS_NOT_SCHEDULED;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.JOB_KEY_DOES_NOT_EXIST;
import static org.apache.griffin.core.exception.GriffinExceptionMessage.MISSING_BASELINE_CONFIG;
import static org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType.BATCH;
import static org.quartz.CronExpression.isValidExpression;
import static org.quartz.JobKey.jobKey;
import static org.quartz.Trigger.TriggerState;
import static org.quartz.Trigger.TriggerState.BLOCKED;
import static org.quartz.Trigger.TriggerState.NORMAL;
import static org.quartz.Trigger.TriggerState.PAUSED;
import static org.quartz.TriggerKey.triggerKey;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.griffin.core.exception.GriffinException;
import org.apache.griffin.core.job.entity.AbstractJob;
import org.apache.griffin.core.job.entity.BatchJob;
import org.apache.griffin.core.job.entity.JobDataSegment;
import org.apache.griffin.core.job.entity.JobHealth;
import org.apache.griffin.core.job.entity.JobInstanceBean;
import org.apache.griffin.core.job.entity.JobState;
import org.apache.griffin.core.job.entity.LivySessionStates;
import org.apache.griffin.core.job.repo.BatchJobRepo;
import org.apache.griffin.core.job.repo.JobInstanceRepo;
import org.apache.griffin.core.measure.entity.DataSource;
import org.apache.griffin.core.measure.entity.GriffinMeasure;
import org.quartz.JobKey;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.TriggerKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@Service
public class BatchJobOperatorImpl implements JobOperator {
private static final Logger LOGGER = LoggerFactory
.getLogger(BatchJobOperatorImpl.class);
@Autowired
@Qualifier("schedulerFactoryBean")
private SchedulerFactoryBean factory;
@Autowired
private JobInstanceRepo instanceRepo;
@Autowired
private BatchJobRepo batchJobRepo;
@Autowired
private JobServiceImpl jobService;
@Override
@Transactional(rollbackFor = Exception.class)
public AbstractJob add(AbstractJob job, GriffinMeasure measure)
throws Exception {
validateParams(job, measure);
String qName = jobService.getQuartzName(job);
String qGroup = jobService.getQuartzGroup();
TriggerKey triggerKey = jobService.getTriggerKeyIfValid(qName, qGroup);
BatchJob batchJob = genBatchJobBean(job, qName, qGroup);
batchJob = batchJobRepo.save(batchJob);
jobService.addJob(triggerKey, batchJob, BATCH);
return job;
}
private BatchJob genBatchJobBean(AbstractJob job,
String qName,
String qGroup) {
BatchJob batchJob = (BatchJob) job;
batchJob.setMetricName(job.getJobName());
batchJob.setGroup(qGroup);
batchJob.setName(qName);
return batchJob;
}
/**
* all states: BLOCKED COMPLETE ERROR NONE NORMAL PAUSED
* to start states: PAUSED
* to stop states: BLOCKED NORMAL
*
* @param job streaming job
*/
@Override
public void start(AbstractJob job) {
String name = job.getName();
String group = job.getGroup();
TriggerState state = getTriggerState(name, group);
if (state == null) {
throw new GriffinException.BadRequestException(
JOB_IS_NOT_SCHEDULED);
}
/* If job is not in paused state,we can't start it
as it may be RUNNING.*/
if (state != PAUSED) {
throw new GriffinException.BadRequestException
(JOB_IS_NOT_IN_PAUSED_STATUS);
}
JobKey jobKey = jobKey(name, group);
try {
factory.getScheduler().resumeJob(jobKey);
} catch (SchedulerException e) {
throw new GriffinException.ServiceException(
"Failed to start job.", e);
}
}
@Override
public void stop(AbstractJob job) {
pauseJob((BatchJob) job, false);
}
@Override
@Transactional
public void delete(AbstractJob job) {
pauseJob((BatchJob) job, true);
}
@Override
public JobHealth getHealth(JobHealth jobHealth, AbstractJob job)
throws SchedulerException {
List<? extends Trigger> triggers = jobService
.getTriggers(job.getName(), job.getGroup());
if (!CollectionUtils.isEmpty(triggers)) {
jobHealth.setJobCount(jobHealth.getJobCount() + 1);
if (jobService.isJobHealthy(job.getId())) {
jobHealth.setHealthyJobCount(
jobHealth.getHealthyJobCount() + 1);
}
}
return jobHealth;
}
@Override
public JobState getState(AbstractJob job, String action)
throws SchedulerException {
JobState jobState = new JobState();
Scheduler scheduler = factory.getScheduler();
if (job.getGroup() == null || job.getName() == null) {
return null;
}
TriggerKey triggerKey = triggerKey(job.getName(), job.getGroup());
TriggerState triggerState = scheduler.getTriggerState(triggerKey);
jobState.setState(triggerState.toString());
jobState.setToStart(getStartStatus(triggerState));
jobState.setToStop(getStopStatus(triggerState));
setTriggerTime(job, jobState);
return jobState;
}
private void setTriggerTime(AbstractJob job, JobState jobState)
throws SchedulerException {
List<? extends Trigger> triggers = jobService
.getTriggers(job.getName(), job.getGroup());
// If triggers are empty, in Griffin it means job is completed whose
// trigger state is NONE or not scheduled.
if (CollectionUtils.isEmpty(triggers)) {
return;
}
Trigger trigger = triggers.get(0);
Date nextFireTime = trigger.getNextFireTime();
Date previousFireTime = trigger.getPreviousFireTime();
jobState.setNextFireTime(nextFireTime != null ?
nextFireTime.getTime() : -1);
jobState.setPreviousFireTime(previousFireTime != null ?
previousFireTime.getTime() : -1);
}
/**
* only PAUSED state of job can be started
*
* @param state job state
* @return true: job can be started, false: job is running which cannot be
* started
*/
private boolean getStartStatus(TriggerState state) {
return state == PAUSED;
}
/**
* only NORMAL or BLOCKED state of job can be started
*
* @param state job state
* @return true: job can be stopped, false: job is running which cannot be
* stopped
*/
private boolean getStopStatus(TriggerState state) {
return state == NORMAL || state == BLOCKED;
}
private TriggerState getTriggerState(String name, String group) {
try {
List<? extends Trigger> triggers = jobService.getTriggers(name,
group);
if (CollectionUtils.isEmpty(triggers)) {
return null;
}
TriggerKey key = triggers.get(0).getKey();
return factory.getScheduler().getTriggerState(key);
} catch (SchedulerException e) {
LOGGER.error("Failed to delete job", e);
throw new GriffinException
.ServiceException("Failed to delete job", e);
}
}
/**
* @param job griffin job
* @param delete if job needs to be deleted,set isNeedDelete true,otherwise
* it just will be paused.
*/
private void pauseJob(BatchJob job, boolean delete) {
try {
pauseJob(job.getGroup(), job.getName());
pausePredicateJob(job);
job.setDeleted(delete);
batchJobRepo.save(job);
} catch (Exception e) {
LOGGER.error("Job schedule happens exception.", e);
throw new GriffinException.ServiceException("Job schedule " +
"happens exception.", e);
}
}
private void pausePredicateJob(BatchJob job) throws SchedulerException {
List<JobInstanceBean> instances = instanceRepo.findByJobId(job.getId());
for (JobInstanceBean instance : instances) {
if (!instance.isPredicateDeleted()) {
deleteJob(instance.getPredicateGroup(), instance
.getPredicateName());
instance.setPredicateDeleted(true);
if (instance.getState().equals(LivySessionStates.State.FINDING)) {
instance.setState(LivySessionStates.State.NOT_FOUND);
}
}
}
}
public void deleteJob(String group, String name) throws SchedulerException {
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = new JobKey(name, group);
if (!scheduler.checkExists(jobKey)) {
LOGGER.info("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
.getName());
return;
}
scheduler.deleteJob(jobKey);
}
private void pauseJob(String group, String name) throws SchedulerException {
if (StringUtils.isEmpty(group) || StringUtils.isEmpty(name)) {
return;
}
Scheduler scheduler = factory.getScheduler();
JobKey jobKey = new JobKey(name, group);
if (!scheduler.checkExists(jobKey)) {
LOGGER.warn("Job({},{}) does not exist.", jobKey.getGroup(), jobKey
.getName());
throw new GriffinException.NotFoundException
(JOB_KEY_DOES_NOT_EXIST);
}
scheduler.pauseJob(jobKey);
}
public boolean pauseJobInstances(List<JobInstanceBean> instances) {
if (CollectionUtils.isEmpty(instances)) {
return true;
}
List<JobInstanceBean> deletedInstances = new ArrayList<>();
boolean pauseStatus = true;
for (JobInstanceBean instance : instances) {
boolean status = pauseJobInstance(instance, deletedInstances);
pauseStatus = pauseStatus && status;
}
instanceRepo.saveAll(deletedInstances);
return pauseStatus;
}
private boolean pauseJobInstance(JobInstanceBean instance,
List<JobInstanceBean> deletedInstances) {
String pGroup = instance.getPredicateGroup();
String pName = instance.getPredicateName();
try {
if (!instance.isPredicateDeleted()) {
deleteJob(pGroup, pName);
instance.setPredicateDeleted(true);
deletedInstances.add(instance);
}
} catch (SchedulerException e) {
LOGGER.error("Failed to pause predicate job({},{}).", pGroup,
pName);
return false;
}
return true;
}
private void validateParams(AbstractJob job, GriffinMeasure measure) {
if (!jobService.isValidJobName(job.getJobName())) {
throw new GriffinException.BadRequestException(INVALID_JOB_NAME);
}
if (!isValidCronExpression(job.getCronExpression())) {
throw new GriffinException.BadRequestException
(INVALID_CRON_EXPRESSION);
}
if (!isValidBaseLine(job.getSegments())) {
throw new GriffinException.BadRequestException
(MISSING_BASELINE_CONFIG);
}
List<String> names = getConnectorNames(measure);
if (!isValidConnectorNames(job.getSegments(), names)) {
throw new GriffinException.BadRequestException
(INVALID_CONNECTOR_NAME);
}
}
private boolean isValidCronExpression(String cronExpression) {
if (StringUtils.isEmpty(cronExpression)) {
LOGGER.warn("Cron Expression is empty.");
return false;
}
if (!isValidExpression(cronExpression)) {
LOGGER.warn("Cron Expression is invalid: {}", cronExpression);
return false;
}
return true;
}
private boolean isValidBaseLine(List<JobDataSegment> segments) {
assert segments != null;
for (JobDataSegment jds : segments) {
if (jds.isAsTsBaseline()) {
return true;
}
}
LOGGER.warn("Please set segment timestamp baseline " +
"in as.baseline field.");
return false;
}
private boolean isValidConnectorNames(List<JobDataSegment> segments,
List<String> names) {
assert segments != null;
Set<String> sets = new HashSet<>();
for (JobDataSegment segment : segments) {
String dcName = segment.getDataConnectorName();
sets.add(dcName);
boolean exist = names.stream().anyMatch(name -> name.equals
(dcName));
if (!exist) {
LOGGER.warn("Param {} is a illegal string. " +
"Please input one of strings in {}.", dcName, names);
return false;
}
}
if (sets.size() < segments.size()) {
LOGGER.warn("Connector names in job data segment " +
"cannot duplicate.");
return false;
}
return true;
}
private List<String> getConnectorNames(GriffinMeasure measure) {
Set<String> sets = new HashSet<>();
List<DataSource> sources = measure.getDataSources();
for (DataSource source : sources) {
source.getConnectors().forEach(dc -> sets.add(dc.getName()));
}
if (sets.size() < sources.size()) {
LOGGER.warn("Connector names cannot be repeated.");
return Collections.emptyList();
}
return new ArrayList<>(sets);
}
}