blob: 4b47264f40a4568dc3eb498e7b5f2ad930e6e3d7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service.modules.scheduler;
import java.io.IOException;
import java.net.URI;
import java.text.ParseException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Calendar;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.TimeZone;
import org.apache.commons.lang.StringUtils;
import org.apache.helix.HelixManager;
import org.quartz.CronExpression;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.InterruptableJob;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.quartz.SchedulerException;
import org.quartz.Trigger;
import org.quartz.UnableToInterruptJobException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.codahale.metrics.MetricFilter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.ConfigurationKeys;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareGauge;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.ServiceMetricNames;
import org.apache.gobblin.runtime.JobException;
import org.apache.gobblin.runtime.api.FlowSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecCatalogListener;
import org.apache.gobblin.runtime.api.SpecNotFoundException;
import org.apache.gobblin.runtime.listeners.JobListener;
import org.apache.gobblin.runtime.metrics.RuntimeMetrics;
import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse;
import org.apache.gobblin.runtime.spec_catalog.FlowCatalog;
import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog;
import org.apache.gobblin.runtime.util.InjectionNames;
import org.apache.gobblin.scheduler.BaseGobblinJob;
import org.apache.gobblin.scheduler.JobScheduler;
import org.apache.gobblin.scheduler.SchedulerService;
import org.apache.gobblin.service.ServiceConfigKeys;
import org.apache.gobblin.service.modules.flowgraph.Dag;
import org.apache.gobblin.service.modules.orchestration.DagManagementStateStore;
import org.apache.gobblin.service.modules.orchestration.DagManager;
import org.apache.gobblin.service.modules.orchestration.FlowLaunchHandler;
import org.apache.gobblin.service.modules.orchestration.Orchestrator;
import org.apache.gobblin.service.modules.orchestration.UserQuotaManager;
import org.apache.gobblin.service.modules.spec.JobExecutionPlan;
import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper;
import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton;
import org.apache.gobblin.service.monitoring.FlowStatusGenerator;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PropertiesUtils;
import static org.apache.gobblin.service.ServiceConfigKeys.GOBBLIN_SERVICE_PREFIX;
/**
* An extension to {@link JobScheduler} that is also a {@link SpecCatalogListener}.
* {@link GobblinServiceJobScheduler} listens for new / updated {@link FlowSpec} and schedules
* and runs them via {@link Orchestrator}.
*/
@Alpha
@Singleton
@Slf4j
public class GobblinServiceJobScheduler extends JobScheduler implements SpecCatalogListener {
// Scheduler related configuration
// A boolean function indicating if current instance will handle DR traffic or not.
public static final String GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED = GOBBLIN_SERVICE_PREFIX + "drNominatedInstance";
protected final Logger _log;
protected final Optional<FlowCatalog> flowCatalog;
protected final Optional<HelixManager> helixManager;
protected final Orchestrator orchestrator;
protected final Boolean isWarmStandbyEnabled;
protected final Optional<UserQuotaManager> quotaManager;
protected final Optional<FlowLaunchHandler> flowTriggerHandler;
@Getter
protected final Map<String, FlowSpec> scheduledFlowSpecs;
@Getter
protected final Map<String, Long> lastUpdatedTimeForFlowSpec;
protected volatile int loadSpecsBatchSize = -1;
protected int skipSchedulingFlowsAfterNumDays;
@Getter
private volatile boolean isActive;
private String serviceName;
private volatile Long perSpecGetRateValue = -1L;
private volatile Long timeToInitializeSchedulerValue = -1L;
private volatile Long timeToObtainSpecUrisValue = -1L;
private volatile Long individualGetSpecSpeedValue = -1L;
private volatile Long eachCompleteAddSpecValue = -1L;
private volatile Long eachSpecCompilationValue = -1L;
private volatile Long eachScheduleJobValue = -1L;
private volatile Long totalGetSpecTimeValue = -1L;
private volatile Long totalAddSpecTimeValue = -1L;
private volatile int numJobsScheduledDuringStartupValue = -1;
private final ContextAwareGauge getSpecsPerSpecRateNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS, () -> this.perSpecGetRateValue);
private final ContextAwareGauge batchSize = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_LOAD_SPECS_BATCH_SIZE, () -> this.loadSpecsBatchSize);
private final ContextAwareGauge timeToInitalizeSchedulerNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_INITIALIZE_SCHEDULER_NANOS, () -> this.timeToInitializeSchedulerValue);
private final ContextAwareGauge timeToObtainSpecUrisNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TIME_TO_OBTAIN_SPEC_URIS_NANOS, () -> timeToObtainSpecUrisValue);
private final ContextAwareGauge individualGetSpecSpeedNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_INDIVIDUAL_GET_SPEC_SPEED_NANOS, () -> individualGetSpecSpeedValue);
private final ContextAwareGauge eachCompleteAddSpecNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_EACH_COMPLETE_ADD_SPEC_NANOS, () -> eachCompleteAddSpecValue);
private final ContextAwareGauge eachSpecCompilationNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_EACH_SPEC_COMPILATION_NANOS, () -> eachSpecCompilationValue);
private final ContextAwareGauge eachScheduleJobNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_EACH_SCHEDULE_JOB_NANOS, () -> eachScheduleJobValue);
private final ContextAwareGauge totalGetSpecTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TOTAL_GET_SPEC_TIME_NANOS, () -> totalGetSpecTimeValue);
private final ContextAwareGauge totalAddSpecTimeNanos = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_TOTAL_ADD_SPEC_TIME_NANOS, () -> totalAddSpecTimeValue);
private final ContextAwareGauge numJobsScheduledDuringStartup = metricContext.newContextAwareGauge(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_NUM_JOBS_SCHEDULED_DURING_STARTUP, () -> numJobsScheduledDuringStartupValue);
private static final MetricContext metricContext = Instrumented.getMetricContext(new org.apache.gobblin.configuration.State(),
GobblinServiceJobScheduler.class);
private static final ContextAwareMeter scheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.SCHEDULED_FLOW_METER);
private static final ContextAwareMeter nonScheduledFlows = metricContext.contextAwareMeter(ServiceMetricNames.NON_SCHEDULED_FLOW_METER);
/**
* If current instances is nominated as a handler for DR traffic from down GaaS-Instance.
* Note this is, currently, different from leadership change/fail-over handling, where the traffic could come
* from GaaS instance out of current GaaS Cluster:
* e.g. There are multi-datacenter deployment of GaaS Cluster. Intra-datacenter fail-over could be handled by
* leadership change mechanism, while inter-datacenter fail-over would be handled by DR handling mechanism.
*/
private boolean isNominatedDRHandler;
/**
* Use this to tag all DR-applicable FlowSpec entries in {@link org.apache.gobblin.runtime.api.SpecStore}
* so only they would be loaded during DR handling.
*/
public static final String DR_FILTER_TAG = "dr";
@Inject
public GobblinServiceJobScheduler(@Named(InjectionNames.SERVICE_NAME) String serviceName,
Config config,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog,
Orchestrator orchestrator, SchedulerService schedulerService, Optional<UserQuotaManager> quotaManager, Optional<Logger> log,
@Named(InjectionNames.WARM_STANDBY_ENABLED) boolean isWarmStandbyEnabled,
Optional<FlowLaunchHandler> flowTriggerHandler) throws Exception {
super(ConfigUtils.configToProperties(config), schedulerService);
_log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass());
this.serviceName = serviceName;
this.flowCatalog = flowCatalog;
this.helixManager = helixManager;
this.orchestrator = orchestrator;
this.scheduledFlowSpecs = Maps.newHashMap();
this.lastUpdatedTimeForFlowSpec = Maps.newHashMap();
this.loadSpecsBatchSize = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.LOAD_SPEC_BATCH_SIZE, String.valueOf(ConfigurationKeys.DEFAULT_LOAD_SPEC_BATCH_SIZE)));
this.skipSchedulingFlowsAfterNumDays = Integer.parseInt(ConfigUtils.configToProperties(config).getProperty(ConfigurationKeys.SKIP_SCHEDULING_FLOWS_AFTER_NUM_DAYS, String.valueOf(ConfigurationKeys.DEFAULT_NUM_DAYS_TO_SKIP_AFTER)));
this.isNominatedDRHandler = config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED)
&& config.hasPath(GOBBLIN_SERVICE_SCHEDULER_DR_NOMINATED);
this.isWarmStandbyEnabled = isWarmStandbyEnabled;
this.quotaManager = quotaManager;
this.flowTriggerHandler = flowTriggerHandler;
// Check that these metrics do not exist before adding, mainly for testing purpose which creates multiple instances
// of the scheduler. If one metric exists, then the others should as well.
MetricFilter filter = MetricFilter.contains(RuntimeMetrics.GOBBLIN_JOB_SCHEDULER_GET_SPECS_DURING_STARTUP_PER_SPEC_RATE_NANOS);
if (metricContext.getGauges(filter).isEmpty()) {
metricContext.register(this.getSpecsPerSpecRateNanos);
metricContext.register(this.batchSize);
metricContext.register(this.timeToInitalizeSchedulerNanos);
metricContext.register(this.timeToObtainSpecUrisNanos);
metricContext.register(this.individualGetSpecSpeedNanos);
metricContext.register(this.eachCompleteAddSpecNanos);
metricContext.register(this.eachSpecCompilationNanos);
metricContext.register(this.eachScheduleJobNanos);
metricContext.register(this.totalGetSpecTimeNanos);
metricContext.register(this.totalAddSpecTimeNanos);
metricContext.register(this.numJobsScheduledDuringStartup);
}
}
public GobblinServiceJobScheduler(String serviceName, Config config, FlowStatusGenerator flowStatusGenerator,
Optional<HelixManager> helixManager, Optional<FlowCatalog> flowCatalog, TopologyCatalog topologyCatalog,
DagManager dagManager, Optional<UserQuotaManager> quotaManager, SchedulerService schedulerService,
Optional<Logger> log, boolean isWarmStandbyEnabled, Optional <FlowLaunchHandler> flowTriggerDecorator,
SharedFlowMetricsSingleton sharedFlowMetricsSingleton, Optional<DagManagementStateStore> dagManagementStateStore,
FlowCompilationValidationHelper flowCompilationValidationHelper)
throws Exception {
this(serviceName, config, helixManager, flowCatalog,
new Orchestrator(config, topologyCatalog, dagManager, log, flowStatusGenerator, flowTriggerDecorator,
sharedFlowMetricsSingleton, flowCatalog, dagManagementStateStore, flowCompilationValidationHelper),
schedulerService, quotaManager, log, isWarmStandbyEnabled, flowTriggerDecorator);
}
public synchronized void setActive(boolean isActive) {
if (this.isActive == isActive) {
// No-op if already in correct state
return;
}
// Since we are going to change status to isActive=true, schedule all flows
if (isActive) {
// Need to set active=true first; otherwise in the onAddSpec(), node will forward specs to active node, which is itself.
this.isActive = isActive;
if (this.flowCatalog.isPresent()) {
// Load spec asynchronously and make scheduler be aware of that.
Thread scheduleSpec = new Thread(new Runnable() {
@Override
public void run() {
// Ensure compiler is healthy before attempting to schedule flows
try {
GobblinServiceJobScheduler.this.orchestrator.getSpecCompiler().awaitHealthy();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
scheduleSpecsFromCatalog();
}
});
scheduleSpec.start();
}
} else {
// Since we are going to change status to isActive=false, unschedule all flows
try {
this.scheduledFlowSpecs.clear();
unscheduleAllJobs();
} catch (SchedulerException e) {
_log.error(String.format("Not all jobs were unscheduled"), e);
// We want to avoid duplicate flow execution, so fail loudly
throw new RuntimeException(e);
}
// Need to set active=false at the end; otherwise in the onDeleteSpec(), node will forward specs to active node, which is itself.
this.isActive = isActive;
}
}
/** Return true if a spec should be scheduled and if it is, modify the spec of an adhoc flow before adding to
* scheduler. Return false otherwise. */
private boolean addSpecHelperMethod(Spec spec) {
// Adhoc flows will not have any job schedule key, but we should schedule them
if (spec instanceof FlowSpec) {
FlowSpec flowSpec = (FlowSpec) spec;
if (!flowSpec.getConfig().hasPath(ConfigurationKeys.JOB_SCHEDULE_KEY) || isWithinRange(
flowSpec.getConfig().getString(ConfigurationKeys.JOB_SCHEDULE_KEY), this.skipSchedulingFlowsAfterNumDays)) {
// Disable FLOW_RUN_IMMEDIATELY on service startup or leadership change if the property is set to true
if (PropertiesUtils.getPropAsBoolean(flowSpec.getConfigAsProperties(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
Spec modifiedSpec = disableFlowRunImmediatelyOnStart((FlowSpec) spec);
onAddSpec(modifiedSpec);
} else {
onAddSpec(spec);
}
return true;
}
}else {
_log.debug("Not scheduling spec {} during startup as next job to schedule is outside of threshold.", spec);
}
return false;
}
/**
* Returns true if next run for the given cron schedule is sooner than the threshold to skip scheduling after, false
* otherwise. If the cron expression cannot be parsed and the next run cannot be calculated returns true to schedule.
* @param cronExpression
* @return num days until next run, max integer in the case it cannot be calculated
*/
@VisibleForTesting
public static boolean isWithinRange(String cronExpression, int maxNumDaysToScheduleWithin) {
if (cronExpression.trim().isEmpty()) {
// If the cron expression is empty return true to capture adhoc flows
return true;
}
CronExpression cron = null;
Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
double numMillisInADay = 86400000;
try {
cron = new CronExpression(cronExpression);
cron.setTimeZone(TimeZone.getTimeZone("UTC"));
Date nextValidTimeAfter = cron.getNextValidTimeAfter(new Date());
if (nextValidTimeAfter == null) {
log.warn("Next valid time doesn't exist since it's out of range for expression: {}. ",
cronExpression);
// nextValidTimeAfter will be null in cases only when CronExpression is outdated for a given range
// this will cause NullPointerException while scheduling FlowSpecs from FlowCatalog
// Hence, returning false to avoid expired flows from being scheduled
return false;
}
cal.setTime(nextValidTimeAfter);
long diff = cal.getTimeInMillis() - System.currentTimeMillis();
return (int) Math.round(diff / numMillisInADay) < maxNumDaysToScheduleWithin;
} catch (ParseException e) {
e.printStackTrace();
// Return false when a parsing exception occurs due to invalid cron
return false;
}
}
/**
* Load all {@link FlowSpec}s from {@link FlowCatalog} as one of the initialization step,
* and make schedulers be aware of that.
*
* If it is newly brought up as the DR handler, will load additional FlowSpecs and handle transition properly.
*/
private void scheduleSpecsFromCatalog() {
int numSpecs = this.flowCatalog.get().getSize();
int actualNumFlowsScheduled = 0;
_log.info("Scheduling specs from catalog: {} flows in the catalog, will skip scheduling flows with next run after "
+ "{} days", numSpecs, this.skipSchedulingFlowsAfterNumDays);
long startTime = System.nanoTime();
long totalGetTime = 0;
long totalAddSpecTime = 0;
Iterator<URI> uriIterator;
HashSet<URI> urisLeftToSchedule = new HashSet<>();
try {
uriIterator = this.flowCatalog.get().getSpecURIs();
while (uriIterator.hasNext()) {
urisLeftToSchedule.add(uriIterator.next());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
this.timeToObtainSpecUrisValue = System.nanoTime() - startTime;
try {
// If current instances nominated as DR handler, will take additional URIS from FlowCatalog.
if (isNominatedDRHandler) {
// Synchronously cleaning the execution state for DR-applicable FlowSpecs
// before rescheduling the again in nominated DR-Hanlder.
Iterator<URI> drUris = this.flowCatalog.get().getSpecURISWithTag(DR_FILTER_TAG);
clearRunningFlowState(drUris);
}
} catch (IOException e) {
throw new RuntimeException("Failed to get Spec URIs with tag to clear running flow state", e);
}
int startOffset = 0;
long batchGetStartTime;
long batchGetEndTime;
while (startOffset < numSpecs) {
batchGetStartTime = System.nanoTime();
Collection<Spec> batchOfSpecs = this.flowCatalog.get().getSpecsPaginated(startOffset, this.loadSpecsBatchSize);
Iterator<Spec> batchOfSpecsIterator = batchOfSpecs.iterator();
batchGetEndTime = System.nanoTime();
while (batchOfSpecsIterator.hasNext()) {
Spec spec = batchOfSpecsIterator.next();
try {
if (addSpecHelperMethod(spec)) {
totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
actualNumFlowsScheduled += 1;
}
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it and continue adding flows
_log.error("Could not schedule spec {} from flowCatalog due to ", spec, e);
}
urisLeftToSchedule.remove(spec.getUri());
}
startOffset += this.loadSpecsBatchSize;
totalGetTime += batchGetEndTime - batchGetStartTime;
// Don't skew the average get spec time value with the last batch that may be very small
if (startOffset == 0 || batchOfSpecs.size() >= Math.round(0.75 * this.loadSpecsBatchSize)) {
perSpecGetRateValue = (batchGetEndTime - batchGetStartTime) / batchOfSpecs.size();
}
}
// Ensure we did not miss any specs due to ordering changing (deletions/insertions) while loading
Iterator<URI> urisLeft = urisLeftToSchedule.iterator();
long individualGetSpecStartTime;
while (urisLeft.hasNext()) {
URI uri = urisLeft.next();
try {
individualGetSpecStartTime = System.nanoTime();
Spec spec = this.flowCatalog.get().getSpecWrapper(uri);
this.individualGetSpecSpeedValue = System.nanoTime() - individualGetSpecStartTime;
totalGetTime += this.individualGetSpecSpeedValue;
if (addSpecHelperMethod(spec)) {
totalAddSpecTime += this.eachCompleteAddSpecValue; // this is updated by each call to onAddSpec
actualNumFlowsScheduled += 1;
}
} catch (Exception e) {
// If there is an uncaught error thrown during compilation, log it and continue adding flows
_log.error("Could not schedule spec uri {} from flowCatalog due to {}", uri, e);
}
}
// Reset value after its last value to get an accurate reading
this.perSpecGetRateValue = -1L;
this.individualGetSpecSpeedValue = -1L;
this.totalGetSpecTimeValue = totalGetTime;
this.totalAddSpecTimeValue = totalAddSpecTime;
this.numJobsScheduledDuringStartupValue = actualNumFlowsScheduled;
this.flowCatalog.get().getMetrics().updateGetSpecTime(startTime);
this.timeToInitializeSchedulerValue = System.nanoTime() - startTime;
}
/**
* In DR-mode, the running {@link FlowSpec} will all be cancelled and rescheduled.
* We will need to make sure that running {@link FlowSpec}s' state are cleared, and corresponding running jobs are
* killed before rescheduling them.
* @param drUris The uris that applicable for DR discovered from FlowCatalog.
*/
private void clearRunningFlowState(Iterator<URI> drUris) {
while (drUris.hasNext()) {
// TODO: Instead of simply call onDeleteSpec, a callback when FlowSpec is deleted from FlowCatalog, should also kill Azkaban Flow from AzkabanSpecProducer.
onDeleteSpec(drUris.next(), FlowSpec.Builder.DEFAULT_VERSION);
}
}
@VisibleForTesting
protected static Spec disableFlowRunImmediatelyOnStart(FlowSpec spec) {
Properties properties = spec.getConfigAsProperties();
properties.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
Config config = ConfigFactory.parseProperties(properties);
return new FlowSpec(spec.getUri(), spec.getVersion(), spec.getDescription(), config, properties,
spec.getTemplateURIs(), spec.getChildSpecs());
}
@Override
protected void startUp() throws Exception {
super.startUp();
}
/**
* Synchronize the job scheduling because the same flowSpec can be scheduled by different threads.
*/
@Override
public synchronized void scheduleJob(Properties jobProps, JobListener jobListener) throws JobException {
Map<String, Object> additionalJobDataMap = Maps.newHashMap();
additionalJobDataMap.put(ServiceConfigKeys.GOBBLIN_SERVICE_FLOWSPEC,
this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY)));
try {
scheduleJob(jobProps, jobListener, additionalJobDataMap, GobblinServiceJob.class);
} catch (Exception e) {
throw new JobException("Failed to schedule job " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY), e);
}
}
@Override
protected void logNewlyScheduledJob(JobDetail job, Trigger trigger) {
Properties jobProps = (Properties) job.getJobDataMap().get(PROPERTIES_KEY);
log.info(jobSchedulerTracePrefixBuilder(jobProps) + "nextTriggerTime: {} - Job newly scheduled",
utcDateAsUTCEpochMillis(trigger.getNextFireTime()));
}
protected static String jobSchedulerTracePrefixBuilder(Properties jobProps) {
return String.format("Scheduler trigger tracing (in epoch-ms UTC): [flowName: %s flowGroup: %s] - ",
jobProps.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<<no flow name>>"),
jobProps.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<no flow group>>"));
}
/**
* Takes a Date object in system default time zone, converts it to UTC before returning the number of milliseconds
* since epoch
* @param date
*/
public static long systemDefaultZoneDateAsUTCEpochMillis(Date date) {
return ZonedDateTime.of(
LocalDateTime.ofInstant(date.toInstant(), ZoneId.systemDefault()),
ZoneOffset.UTC).toInstant().toEpochMilli();
}
/**
* Takes a Date object in UTC and returns the number of milliseconds since epoch
* @param date
*/
public static long utcDateAsUTCEpochMillis(Date date) {
return date.toInstant().toEpochMilli();
}
@Override
public void runJob(Properties jobProps, JobListener jobListener) throws JobException {
try {
FlowSpec flowSpec = this.scheduledFlowSpecs.get(jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY));
// The trigger event time will be missing for adhoc and run-immediately flows, so we set the default here
String triggerTimestampMillis = jobProps.getProperty(
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_DEFAULT_VAL);
boolean isReminderEvent =
Boolean.parseBoolean(jobProps.getProperty(ConfigurationKeys.FLOW_IS_REMINDER_EVENT_KEY, "false"));
this.orchestrator.orchestrate(flowSpec, jobProps, Long.parseLong(triggerTimestampMillis), isReminderEvent);
} catch (Exception e) {
String exceptionPrefix = "Failed to run Spec: " + jobProps.getProperty(ConfigurationKeys.JOB_NAME_KEY);
log.warn(exceptionPrefix + " because", e);
throw new JobException(exceptionPrefix, e);
}
}
/**
*
* @param addedSpec spec to be added
* @return add spec response, which contains <code>null</code> if there is an error
*/
@Override
public AddSpecResponse onAddSpec(Spec addedSpec) {
long startTime = System.nanoTime();
if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
// Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore
// .. Specs if in cluster mode and Helix is not yet initialized
_log.info("System not yet initialized. Skipping Spec Addition: " + addedSpec);
return null;
}
_log.info("New Flow Spec detected: " + addedSpec);
if (!(addedSpec instanceof FlowSpec)) {
return null;
}
FlowSpec flowSpec = (FlowSpec) addedSpec;
URI flowSpecUri = flowSpec.getUri();
Properties jobConfig = createJobConfig(flowSpec);
boolean isExplain = flowSpec.isExplain();
String response = null;
long compileStartTime = System.nanoTime();
// always try to compile the flow to verify if it is compilable
Dag<JobExecutionPlan> dag = this.orchestrator.getSpecCompiler().compileFlow(flowSpec);
this.eachSpecCompilationValue = System.nanoTime() - compileStartTime;
// If dag is null then a compilation error has occurred
if (dag != null && !dag.isEmpty()) {
response = dag.toString();
}
boolean compileSuccess = FlowCatalog.isCompileSuccessful(response);
if (isExplain || !compileSuccess || !this.isActive) {
// todo: in case of a scheduled job, we should also check if the job schedule is a valid cron schedule
// so it can be scheduled
_log.info("Ignoring the spec {}. isExplain: {}, compileSuccess: {}, master: {}",
addedSpec, isExplain, compileSuccess, this.isActive);
return new AddSpecResponse<>(response);
}
// Check quota limits against adhoc flows before saving the schedule
// In warm standby mode, this quota check will happen on restli API layer when we accept the flow
if (!this.isWarmStandbyEnabled && !jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
// This block should be reachable only for the execution for the adhoc flows
// For flow that has scheduler but run-immediately set to be true, we won't check the quota as we will use a different execution id later
if (quotaManager.isPresent()) {
// QuotaManager has idempotent checks for a dagNode, so this check won't double add quotas for a flow in the DagManager
try {
quotaManager.get().checkQuota(dag.getStartNodes());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
// Compare the modification timestamp of the spec being added if the scheduler is being initialized, ideally we
// don't even want to do the same update twice as it will kill the existing flow and reschedule it unnecessarily
Long modificationTime = Long.valueOf(flowSpec.getConfigAsProperties().getProperty(FlowSpec.MODIFICATION_TIME_KEY, "0"));
String uriString = flowSpec.getUri().toString();
Boolean isRunImmediately = PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false");
// If the modification time is 0 (which means the original API was used to retrieve spec or warm standby mode is not
// enabled), spec not in scheduler, or have a modification time associated with it assume it's the most recent
if (modificationTime != 0L && this.scheduledFlowSpecs.containsKey(uriString)
&& this.lastUpdatedTimeForFlowSpec.containsKey(uriString)) {
// For run-immediately flows with a schedule the modified_time would remain the same
if (this.lastUpdatedTimeForFlowSpec.get(uriString).compareTo(modificationTime) > 0
|| (this.lastUpdatedTimeForFlowSpec.get(uriString).equals(modificationTime) && !isRunImmediately)) {
_log.warn("Ignoring the spec {} modified at time {} because we have a more updated version from time {}",
addedSpec, modificationTime,this.lastUpdatedTimeForFlowSpec.get(uriString));
this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
return new AddSpecResponse(response);
}
}
// todo : we should probably not schedule a flow if it is a runOnce flow
this.scheduledFlowSpecs.put(flowSpecUri.toString(), flowSpec);
this.lastUpdatedTimeForFlowSpec.put(flowSpecUri.toString(), modificationTime);
if (jobConfig.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY)) {
_log.info("{} Scheduling flow spec: {} ", this.serviceName, addedSpec);
try {
long scheduleStartTime = System.nanoTime();
scheduleJob(jobConfig, null);
this.eachScheduleJobValue = System.nanoTime() - scheduleStartTime;
} catch (JobException je) {
_log.error("{} Failed to schedule or run FlowSpec {}", serviceName, addedSpec, je);
this.scheduledFlowSpecs.remove(addedSpec.getUri().toString());
this.lastUpdatedTimeForFlowSpec.remove(flowSpecUri.toString());
this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
return null;
}
if (PropertiesUtils.getPropAsBoolean(jobConfig, ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false")) {
_log.info("RunImmediately requested, hence executing FlowSpec: " + addedSpec);
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, false, jobConfig, null));
}
} else {
_log.info("No FlowSpec schedule found, so running FlowSpec: " + addedSpec);
this.jobExecutor.execute(new NonScheduledJobRunner(flowSpecUri, true, jobConfig, null));
}
this.eachCompleteAddSpecValue = System.nanoTime() - startTime;
return new AddSpecResponse<>(response);
}
/**
* Remove a flowSpec from schedule
* Unlike onDeleteSpec, we want to avoid deleting the flowSpec on the executor
* and we still want to unschedule if we cannot connect to zookeeper as the current node cannot be the master
* @param specURI
* @param specVersion
*/
private void unscheduleSpec(URI specURI, String specVersion) throws JobException {
if (this.scheduledFlowSpecs.containsKey(specURI.toString())) {
_log.info("Unscheduling flowSpec " + specURI + "/" + specVersion);
this.scheduledFlowSpecs.remove(specURI.toString());
this.lastUpdatedTimeForFlowSpec.remove(specURI.toString());
unscheduleJob(specURI.toString());
try {
FlowSpec spec = this.flowCatalog.get().getSpecs(specURI);
Properties properties = spec.getConfigAsProperties();
_log.info(jobSchedulerTracePrefixBuilder(properties) + "Unscheduled Spec");
} catch (SpecNotFoundException e) {
_log.warn("Unable to retrieve spec for URI {}", specURI);
}
} else {
throw new JobException(String.format(
"Spec with URI: %s was not found in cache. May be it was cleaned, if not please clean it manually",
specURI));
}
}
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) {
onDeleteSpec(deletedSpecURI, deletedSpecVersion, new Properties());
}
/** {@inheritDoc} */
@Override
public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion, Properties headers) {
if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
// Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore
// .. Specs if in cluster mode and Helix is not yet initialized
_log.info("System not yet initialized. Skipping Spec Deletion: " + deletedSpecURI);
return;
}
_log.info("Spec deletion detected: " + deletedSpecURI + "/" + deletedSpecVersion);
if (!this.isActive) {
_log.info("Skipping deletion of this spec {}/{} for non-leader host", deletedSpecURI, deletedSpecVersion);
return;
}
try {
Spec deletedSpec = this.scheduledFlowSpecs.get(deletedSpecURI.toString());
unscheduleSpec(deletedSpecURI, deletedSpecVersion);
this.orchestrator.remove(deletedSpec, headers);
} catch (JobException | IOException e) {
_log.warn(String.format("Spec with URI: %s was not unscheduled cleaning", deletedSpecURI), e);
}
}
/** {@inheritDoc} */
@Override
public void onUpdateSpec(Spec updatedSpec) {
if (this.helixManager.isPresent() && !this.helixManager.get().isConnected()) {
// Specs in store will be notified when Scheduler is added as listener to FlowCatalog, so ignore
// .. Specs if in cluster mode and Helix is not yet initialized
_log.info("System not yet initialized. Skipping Spec Update: " + updatedSpec);
return;
}
_log.info("Spec changed: " + updatedSpec);
if (!(updatedSpec instanceof FlowSpec)) {
return;
}
try {
onAddSpec(updatedSpec);
} catch (Exception e) {
_log.error("Failed to update Spec: " + updatedSpec, e);
}
}
private Properties createJobConfig(FlowSpec flowSpec) {
Properties jobConfig = new Properties();
Properties flowSpecProperties = flowSpec.getConfigAsProperties();
jobConfig.putAll(this.properties);
jobConfig.setProperty(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getUri().toString());
jobConfig.setProperty(ConfigurationKeys.JOB_GROUP_KEY,
flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY).toString());
jobConfig.setProperty(ConfigurationKeys.FLOW_RUN_IMMEDIATELY,
ConfigUtils.getString((flowSpec).getConfig(), ConfigurationKeys.FLOW_RUN_IMMEDIATELY, "false"));
// todo : we should check if the job schedule is a valid cron schedule
if (flowSpecProperties.containsKey(ConfigurationKeys.JOB_SCHEDULE_KEY) && StringUtils.isNotBlank(
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY))) {
jobConfig.setProperty(ConfigurationKeys.JOB_SCHEDULE_KEY,
flowSpecProperties.getProperty(ConfigurationKeys.JOB_SCHEDULE_KEY));
}
// Note: the default values for missing flow name/group are different than the ones above to easily identify where
// the values are not present initially
jobConfig.setProperty(ConfigurationKeys.FLOW_NAME_KEY,
flowSpecProperties.getProperty(ConfigurationKeys.FLOW_NAME_KEY, "<<missing flow name>>"));
jobConfig.setProperty(ConfigurationKeys.FLOW_GROUP_KEY,
flowSpecProperties.getProperty(ConfigurationKeys.FLOW_GROUP_KEY, "<<missing flow group>>"));
return jobConfig;
}
/**
* A Gobblin job to be scheduled.
*/
@DisallowConcurrentExecution
@Slf4j
public static class GobblinServiceJob extends BaseGobblinJob implements InterruptableJob {
private static final Logger _log = LoggerFactory.getLogger(GobblinServiceJob.class);
@Override
public void executeImpl(JobExecutionContext context) throws JobExecutionException {
try {
// TODO: move this out of the try clause after location NPE source
JobDetail jobDetail = context.getJobDetail();
_log.info("Starting FlowSpec " + jobDetail.getKey());
JobDataMap dataMap = jobDetail.getJobDataMap();
JobScheduler jobScheduler = (JobScheduler) dataMap.get(JOB_SCHEDULER_KEY);
Properties jobProps = (Properties) dataMap.get(PROPERTIES_KEY);
JobListener jobListener = (JobListener) dataMap.get(JOB_LISTENER_KEY);
// Obtain trigger timestamp from trigger to pass to jobProps
Trigger trigger = context.getTrigger();
// THIS current event has already fired if this method is called, so it now exists in <previousFireTime>
long triggerTimeMillis = utcDateAsUTCEpochMillis(trigger.getPreviousFireTime());
// If the trigger is a reminder type event then utilize the trigger time saved in job properties rather than the
// actual firing time
if (jobDetail.getKey().getName().contains("reminder")) {
String preservedConsensusEventTime = jobProps.getProperty(
ConfigurationKeys.SCHEDULER_PRESERVED_CONSENSUS_EVENT_TIME_MILLIS_KEY, "0");
String expectedReminderTime = jobProps.getProperty(
ConfigurationKeys.SCHEDULER_EXPECTED_REMINDER_TIME_MILLIS_KEY, "0");
_log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} expectedReminderTime: {} - Reminder job"
+ " triggered by scheduler at {}", preservedConsensusEventTime, expectedReminderTime,
triggerTimeMillis);
// TODO: add a metric if expected reminder time far exceeds system time
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY, preservedConsensusEventTime);
} else {
jobProps.setProperty(ConfigurationKeys.ORCHESTRATOR_TRIGGER_EVENT_TIME_MILLIS_KEY,
String.valueOf(triggerTimeMillis));
_log.info(jobSchedulerTracePrefixBuilder(jobProps) + "triggerTime: {} nextTriggerTime: {} - Job triggered by "
+ "scheduler", triggerTimeMillis, utcDateAsUTCEpochMillis(trigger.getNextFireTime()));
}
jobScheduler.runJob(jobProps, jobListener);
} catch (Throwable t) {
if (t instanceof NullPointerException) {
log.warn("NullPointerException encountered while trying to execute flow. Message: " + t.getMessage(), t);
}
throw new JobExecutionException(t);
} finally {
scheduledFlows.mark();
}
}
@Override
public void interrupt() throws UnableToInterruptJobException {
log.info("Job was interrupted");
}
}
/**
* This class is responsible for running non-scheduled jobs.
*/
class NonScheduledJobRunner implements Runnable {
private final URI specUri;
private final Properties jobConfig;
private final JobListener jobListener;
private final boolean removeSpec;
public NonScheduledJobRunner(URI uri, boolean removeSpec, Properties jobConfig, JobListener jobListener) {
this.specUri = uri;
this.jobConfig = jobConfig;
this.jobListener = jobListener;
this.removeSpec = removeSpec;
}
@Override
public void run() {
try {
GobblinServiceJobScheduler.this.runJob(this.jobConfig, this.jobListener);
if (flowCatalog.isPresent() && removeSpec) {
Object syncObject = GobblinServiceJobScheduler.this.flowCatalog.get().getSyncObject(specUri.toString());
if (syncObject != null) {
// if the sync object does not exist, this job must be set to run due to job submission at service restart
synchronized (syncObject) {
while (!GobblinServiceJobScheduler.this.flowCatalog.get().exists(specUri)) {
syncObject.wait();
}
}
}
// Note that we only remove the spec from the flow catalog after it is orchestrated
GobblinServiceJobScheduler.this.scheduledFlowSpecs.remove(specUri.toString());
GobblinServiceJobScheduler.this.lastUpdatedTimeForFlowSpec.remove(specUri.toString());
}
} catch (JobException je) {
_log.error("Failed to run job " + this.jobConfig.getProperty(ConfigurationKeys.JOB_NAME_KEY), je);
} catch (InterruptedException e) {
_log.error("Failed to delete the spec " + specUri, e);
} finally {
nonScheduledFlows.mark();
}
}
}
}