blob: 2009a20fddced4b0d62eaae90c98dfe1cb989686 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lens.server.scheduler;
import org.apache.lens.api.scheduler.SchedulerJobHandle;
import org.apache.lens.api.scheduler.XFrequency;
import org.apache.lens.api.scheduler.XFrequencyEnum;
import org.apache.lens.server.LensServices;
import org.apache.lens.server.api.LensService;
import org.apache.lens.server.api.error.LensException;
import org.apache.lens.server.api.events.LensEventService;
import org.apache.lens.server.api.events.SchedulerAlarmEvent;
import org.apache.lens.server.api.health.HealthStatus;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hive.service.AbstractService;
import org.joda.time.DateTime;
import org.quartz.*;
import org.quartz.impl.StdSchedulerFactory;
import lombok.extern.slf4j.Slf4j;
/**
* This service is used primarily by Scheduler to get alarm notifications for scheduled queries.
* <p>
* As a schedule this service accepts start time, frequency, end time and timeZone. It also requires the
* {@link SchedulerJobHandle} which it sends as part of the
* {@link org.apache.lens.server.api.events.SchedulerAlarmEvent} to inform the scheduler about the job for which
* job the notification has been generated.
*/
@Slf4j
public class AlarmService extends AbstractService implements LensService {
public static final String NAME = "alarm";
public static final String LENS_JOBS = "LensJobs";
public static final String ALARM_SERVICE = "AlarmService";
private Scheduler scheduler;
/**
* True if the service started properly and is running fine, false otherwise.
*/
private boolean isHealthy = true;
/**
* Contains the reason if service is not healthy.
*/
private String healthCause;
/**
* Creates a new instance of AlarmService.
*
*/
public AlarmService() {
super(NAME);
}
@Override
public HealthStatus getHealthStatus() {
return isHealthy
? new HealthStatus(isHealthy, "Alarm service is healthy.")
: new HealthStatus(isHealthy, healthCause);
}
public synchronized void init(HiveConf hiveConf) {
super.init(hiveConf);
try {
this.scheduler = StdSchedulerFactory.getDefaultScheduler();
} catch (SchedulerException e) {
isHealthy = false;
healthCause = "Failed to initialize the Quartz Scheduler for AlarmService.";
log.error(healthCause, e);
throw new IllegalStateException("Could not initialize the Alarm Service", e);
}
}
@Override
public synchronized void start() {
try {
scheduler.start();
log.info("Alarm service started successfully!");
} catch (SchedulerException e) {
isHealthy = false;
healthCause = "Failed to start the Quartz Scheduler for AlarmService.";
log.error(healthCause, e);
throw new IllegalStateException("Could not start the Alarm service", e);
}
}
@Override
public synchronized void stop() {
try {
scheduler.standby();
log.info("Alarm Service stopped successfully.");
} catch (SchedulerException e) {
log.error("Failed to shut down the Quartz Scheduler for AlarmService.", e);
}
}
/**
* This method can be used by any consumer who wants to receive notifications during a time range at a given
* frequency.
* <p>
* This method is intended to be used by LensScheduler to subscribe for time based notifications to schedule queries.
* On receiving a job to be scheduled LensScheduler will subscribe to all triggers required for the job, including
* AlarmService for time based triggers.
*
* @param start start time for notifications
* @param end end time for notifications
* @param frequency Frequency to determine the frequency at which notification should be sent.
* @param jobHandle Must be a unique jobHanlde across all consumers
*/
public void schedule(DateTime start, DateTime end, XFrequency frequency, String jobHandle) throws LensException {
// accept the schedule and then keep on sending the notifications for that schedule
JobDataMap map = new JobDataMap();
map.put("jobHandle", jobHandle);
JobDetail job = JobBuilder.newJob(LensJob.class).withIdentity(jobHandle, LENS_JOBS).usingJobData(map).build();
Trigger trigger;
if (frequency.getEnum() != null) { //for enum expression: create a trigger using calendar interval
CalendarIntervalScheduleBuilder scheduleBuilder = CalendarIntervalScheduleBuilder.calendarIntervalSchedule()
.withInterval(getTimeInterval(frequency.getEnum()), getTimeUnit(frequency.getEnum()))
.withMisfireHandlingInstructionIgnoreMisfires();
trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate())
.endAt(end.toDate()).withSchedule(scheduleBuilder).build();
} else { // for cron expression create a cron trigger
trigger = TriggerBuilder.newTrigger().withIdentity(jobHandle, ALARM_SERVICE).startAt(start.toDate())
.endAt(end.toDate()).withSchedule(CronScheduleBuilder.cronSchedule(frequency.getCronExpression())
.withMisfireHandlingInstructionIgnoreMisfires()).build();
}
// Tell quartz to run the job using our trigger
try {
scheduler.scheduleJob(job, trigger);
} catch (SchedulerException e) {
log.error("Error scheduling job with jobHandle: {}", jobHandle);
throw new LensException("Failed to schedule job with jobHandle: " + jobHandle, e);
}
}
private int getTimeInterval(XFrequencyEnum frequencyEnum) {
// since quarterly is not supported natively, we express it as 3 months
return frequencyEnum == XFrequencyEnum.QUARTERLY ? 3 : 1;
}
// Maps the timeunit in entity specification to the one in Quartz DateBuilder
private DateBuilder.IntervalUnit getTimeUnit(XFrequencyEnum frequencyEnum) {
switch (frequencyEnum) {
case DAILY:
return DateBuilder.IntervalUnit.DAY;
case WEEKLY:
return DateBuilder.IntervalUnit.WEEK;
case MONTHLY:
return DateBuilder.IntervalUnit.MONTH;
case QUARTERLY:
return DateBuilder.IntervalUnit.MONTH;
case YEARLY:
return DateBuilder.IntervalUnit.YEAR;
default:
throw new IllegalArgumentException("Invalid frequency enum expression: " + frequencyEnum.name());
}
}
public boolean unSchedule(SchedulerJobHandle jobHandle) throws LensException {
// stop sending notifications for this job handle
try {
return scheduler.deleteJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS));
} catch (SchedulerException e) {
log.error("Failed to remove alarm triggers for job with jobHandle: {}", jobHandle);
throw new LensException("Failed to remove alarm triggers for job with jobHandle: " + jobHandle, e);
}
}
public boolean checkExists(SchedulerJobHandle handle) throws LensException {
try {
return scheduler.checkExists(JobKey.jobKey(handle.getHandleIdString(), LENS_JOBS));
} catch (SchedulerException e) {
log.error("Failed to check the job with jobHandle: {}", handle);
return false;
}
}
public void pauseJob(SchedulerJobHandle jobHandle) throws LensException {
try {
scheduler.pauseJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS));
} catch (SchedulerException e) {
log.error("Failed to pause alarm triggers for job with jobHandle: {}", jobHandle);
throw new LensException("Failed to pause alarm triggers for job with jobHandle: " + jobHandle, e);
}
}
public void resumeJob(SchedulerJobHandle jobHandle) throws LensException {
try {
scheduler.resumeJob(JobKey.jobKey(jobHandle.getHandleIdString(), LENS_JOBS));
} catch (SchedulerException e) {
log.error("Failed to resume alarm triggers for job with jobHandle: {}", jobHandle);
throw new LensException("Failed to resume alarm triggers for job with jobHandle: " + jobHandle, e);
}
}
public static class LensJob implements Job {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
JobDataMap data = jobExecutionContext.getMergedJobDataMap();
DateTime nominalTime = new DateTime(jobExecutionContext.getScheduledFireTime());
SchedulerJobHandle jobHandle = SchedulerJobHandle.fromString(data.getString("jobHandle"));
SchedulerAlarmEvent alarmEvent = new SchedulerAlarmEvent(jobHandle, nominalTime,
SchedulerAlarmEvent.EventType.SCHEDULE, null);
try {
LensEventService eventService = LensServices.get().getService(LensEventService.NAME);
eventService.notifyEvent(alarmEvent);
if (jobExecutionContext.getNextFireTime() == null) {
eventService
.notifyEvent(new SchedulerAlarmEvent(jobHandle, nominalTime, SchedulerAlarmEvent.EventType.EXPIRE, null));
}
} catch (LensException e) {
log.error("Failed to notify SchedulerAlarmEvent for jobHandle: {} and scheduleTime: {}",
jobHandle.getHandleIdString(), nominalTime.toString());
throw new JobExecutionException("Failed to notify alarmEvent", e);
}
}
}
}