blob: 50bd9bc42599be4b1ece764cb601721c4398ac48 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.usergrid.batch.service;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.google.inject.Injector;
import org.apache.usergrid.persistence.core.metrics.MetricsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.usergrid.batch.Job;
import org.apache.usergrid.batch.JobExecution;
import org.apache.usergrid.batch.JobExecution.Status;
import org.apache.usergrid.batch.JobExecutionImpl;
import org.apache.usergrid.batch.JobFactory;
import org.apache.usergrid.batch.JobNotFoundException;
import org.apache.usergrid.batch.repository.JobAccessor;
import org.apache.usergrid.batch.repository.JobDescriptor;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.AbstractScheduledService;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningScheduledExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
/**
* Service that schedules itself, then schedules jobs in the same pool
*/
public class JobSchedulerService extends AbstractScheduledService {
protected static final long DEFAULT_DELAY = 1000;
private static final Logger logger = LoggerFactory.getLogger( JobSchedulerService.class );
private long interval = DEFAULT_DELAY;
private int workerSize = 1;
private int maxFailCount = 10;
private JobAccessor jobAccessor;
private JobFactory jobFactory;
private Semaphore capacitySemaphore;
private ListeningScheduledExecutorService service;
private JobListener jobListener;
private Timer jobTimer;
private Counter runCounter;
private Counter successCounter;
private Counter failCounter;
private Injector injector;
//TODO Add meters for throughput of start and stop
public JobSchedulerService() { }
@Override
protected void runOneIteration() throws Exception {
MetricsFactory metricsFactory = injector.getInstance( MetricsFactory.class );
jobTimer = metricsFactory.getTimer( JobSchedulerService.class, "scheduler.job_execution_timer" );
runCounter = metricsFactory.getCounter( JobSchedulerService.class, "scheduler.running_workers" );
successCounter = metricsFactory.getCounter( JobSchedulerService.class, "scheduler.successful_jobs" );
failCounter = metricsFactory.getCounter( JobSchedulerService.class, "scheduler.failed_jobs" );
try {
logger.info( "Running one check iteration ..." );
List<JobDescriptor> activeJobs;
// run until there are no more active jobs
while ( true ) {
// get the semaphore if we can. This means we have space for at least 1
// job
if ( logger.isDebugEnabled() ) {
logger.debug( "About to acquire semaphore. Capacity is {}", capacitySemaphore.availablePermits() );
}
capacitySemaphore.acquire();
// release the sempaphore we only need to acquire as a way to stop the
// loop if there's no capacity
capacitySemaphore.release();
int capacity = capacitySemaphore.availablePermits();
if (logger.isDebugEnabled()) {
logger.debug("Capacity is {}", capacity);
}
activeJobs = jobAccessor.getJobs( capacity );
// nothing to do, we don't have any jobs to run
if ( activeJobs.size() == 0 ) {
if (logger.isDebugEnabled()) {
logger.debug("No jobs returned. Exiting run loop");
}
return;
}
for ( JobDescriptor jd : activeJobs ) {
logger.info( "Submitting work for {}", jd );
submitWork( jd );
logger.info( "Work submitted for {}", jd );
}
}
}
catch ( Throwable t ) {
if (logger.isDebugEnabled()) {
logger.debug("Scheduler run failed, error is", t);
}
}
}
/*
* (non-Javadoc)
*
* @see com.google.common.util.concurrent.AbstractScheduledService#scheduler()
*/
@Override
protected Scheduler scheduler() {
return Scheduler.newFixedDelaySchedule( 0, interval, TimeUnit.MILLISECONDS );
}
/**
* Use the provided BulkJobFactory to build and submit BulkJob items as ListenableFuture objects
*/
private void submitWork( final JobDescriptor jobDescriptor ) {
final Job job;
try {
job = jobFactory.jobsFrom( jobDescriptor );
}
catch ( JobNotFoundException e ) {
logger.error( "Could not create jobs", e );
return;
}
// job execution needs to be external to both the callback and the task.
// This way regardless of any error we can
// mark a job as failed if required
final JobExecution execution = new JobExecutionImpl( jobDescriptor );
// We don't care if this is atomic (not worth using a lock object)
// we just need to prevent NPEs from ever occurring
final JobListener currentListener = this.jobListener;
/**
* Acquire the semaphore before we schedule. This way we wont' take things from the Q that end up
* stuck in the queue for the scheduler and then time out their distributed heartbeat
*/
try {
capacitySemaphore.acquire();
}
catch ( InterruptedException e ) {
logger.error( "Unable to acquire semaphore capacity before submitting job", e );
//just return, they'll get picked up again later
return;
}
final Timer.Context timer = jobTimer.time();
ListenableFuture<Void> future = service.submit( new Callable<Void>() {
@Override
public Void call() throws Exception {
if (logger.isDebugEnabled()) {
logger.debug("Starting the job with job id {}", execution.getJobId());
}
runCounter.inc();
execution.start( maxFailCount );
//this job is dead, treat it as such
if ( execution.getStatus() == Status.DEAD ) {
try {
job.dead( execution );
jobAccessor.save( execution );
}
catch ( Exception t ) {
//we purposefully swallow all exceptions here, we don't want it to effect the outcome
//of finally popping this job from the queue
logger.error( "Unable to invoke dead event on job", t );
}
return null;
}
jobAccessor.save( execution );
// TODO wrap and throw specifically typed exception for onFailure,
// needs jobId
logger.info( "Starting job {} with execution data {}", job, execution );
job.execute( execution );
if ( currentListener != null ) {
currentListener.onSubmit( execution );
}
return null;
}
} );
Futures.addCallback( future, new FutureCallback<Void>() {
@Override
public void onSuccess( Void param ) {
/**
* Release semaphore first in case there are other problems with communicating with Cassandra
*/
if (logger.isDebugEnabled()) {
logger.debug("Job succeeded with the job id {}", execution.getJobId());
}
capacitySemaphore.release();
timer.stop();
runCounter.dec();
successCounter.inc();
//TODO, refactor into the execution itself for checking if done
if ( execution.getStatus() == Status.IN_PROGRESS ) {
logger.info( "Successful completion of bulkJob {}", execution );
execution.completed();
}
jobAccessor.save( execution );
if ( currentListener != null ) {
currentListener.onSuccess( execution );
}
}
@Override
public void onFailure( Throwable throwable ) {
/**
* Release semaphore first in case there are other problems with communicating with Cassandra
*/
logger.error( "Job failed with the job id {}", execution.getJobId() );
capacitySemaphore.release();
timer.stop();
runCounter.dec();
failCounter.inc();
logger.error( "Failed execution for bulkJob", throwable );
// mark it as failed
if ( execution.getStatus() == Status.IN_PROGRESS ) {
execution.failed();
}
jobAccessor.save( execution );
if ( currentListener != null ) {
currentListener.onFailure( execution );
}
}
} );
}
/**
* @param milliseconds the milliseconds to set to wait if we didn't receive a job to run
*/
public void setInterval( long milliseconds ) {
this.interval = milliseconds;
}
public long getInterval() {
return interval;
}
/**
* @param listeners the listeners to set
*/
public void setWorkerSize( int listeners ) {
this.workerSize = listeners;
}
public int getWorkerSize() {
return workerSize;
}
/**
* @param jobAccessor the jobAccessor to set
*/
public void setJobAccessor( JobAccessor jobAccessor ) {
this.jobAccessor = jobAccessor;
}
/**
* @param jobFactory the jobFactory to set
*/
public void setJobFactory( JobFactory jobFactory ) {
this.jobFactory = jobFactory;
}
/**
* @param maxFailCount the maxFailCount to set
*/
public void setMaxFailCount( int maxFailCount ) {
this.maxFailCount = maxFailCount;
}
/**
* Set the metrics factory
*/
// public void setMetricsFactory( MetricsFactory metricsFactory ) {
//
// jobTimer = metricsFactory.getTimer( JobSchedulerService.class, "job_execution_timer" );
// runCounter = metricsFactory.getCounter( JobSchedulerService.class, "running_workers" );
// successCounter = metricsFactory.getCounter( JobSchedulerService.class, "successful_jobs" );
// failCounter = metricsFactory.getCounter( JobSchedulerService.class, "failed_jobs" );
// }
/*
* (non-Javadoc)
*
* @see com.google.common.util.concurrent.AbstractScheduledService#startUp()
*/
@Override
protected void startUp() throws Exception {
service = MoreExecutors
.listeningDecorator( Executors.newScheduledThreadPool( workerSize, JobThreadFactory.INSTANCE ) );
capacitySemaphore = new Semaphore( workerSize );
logger.info( "Starting executor pool. Capacity is {}", workerSize );
super.startUp();
logger.info( "Job Scheduler started" );
}
/*
* (non-Javadoc)
*
* @see com.google.common.util.concurrent.AbstractScheduledService#shutDown()
*/
@Override
protected void shutDown() throws Exception {
logger.info( "Shutting down job scheduler" );
service.shutdown();
logger.info( "Job scheduler shut down" );
super.shutDown();
}
/**
* Sets the JobListener notified of Job events on this SchedulerService.
*
* @param jobListener the listener to receive Job events
*
* @return the previous listener if set, or null if none was set
*/
public JobListener setJobListener( JobListener jobListener ) {
JobListener old = this.jobListener;
this.jobListener = jobListener;
return old;
}
/**
* Gets the current JobListener to be notified of Job events on this SchedulerService.
*
* @return the current JobListener or null if none was set
*/
public JobListener getJobListener() {
return jobListener;
}
public void setInjector(Injector injector) {
this.injector = injector;
}
/**
* Simple factory for labeling job worker threads for easier debugging
*/
private static final class JobThreadFactory implements ThreadFactory {
public static final JobThreadFactory INSTANCE = new JobThreadFactory();
private static final String NAME = "JobWorker-";
private final AtomicLong counter = new AtomicLong();
@Override
public Thread newThread( final Runnable r ) {
Thread newThread = new Thread( r, NAME + counter.incrementAndGet() );
newThread.setDaemon( true );
return newThread;
}
}
}