blob: 96f639c190a613fce78fb7ae1483c3c622a08c16 [file] [log] [blame]
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package org.apache.tapestry5.ioc.internal.services.cron;
import org.apache.tapestry5.ioc.Invokable;
import org.apache.tapestry5.ioc.annotations.PostInjection;
import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
import org.apache.tapestry5.ioc.services.ParallelExecutor;
import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
import org.apache.tapestry5.ioc.services.cron.Schedule;
import org.slf4j.Logger;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable
{
private final ParallelExecutor parallelExecutor;
private final Logger logger;
// Synchronized by jobLock
private final List<Job> jobs = CollectionFactory.newList();
private final Thread thread = new Thread(this, "Tapestry PeriodicExecutor");
private transient boolean shutdown;
private static final long FIVE_MINUTES = 5 * 60 * 1000;
private final AtomicInteger jobIdAllocator = new AtomicInteger();
private final AtomicBoolean started = new AtomicBoolean();
private final Lock jobLock = new ReentrantLock();
private class Job implements PeriodicJob, Invokable<Void>
{
final int jobId = jobIdAllocator.incrementAndGet();
private final Schedule schedule;
private final String name;
private final Runnable runnableJob;
private boolean executing, canceled;
private long nextExecution;
public Job(Schedule schedule, String name, Runnable runnableJob)
{
this.schedule = schedule;
this.name = name;
this.runnableJob = runnableJob;
nextExecution = schedule.firstExecution();
}
@Override
public String getName()
{
return name;
}
public long getNextExecution()
{
try
{
jobLock.lock();
return nextExecution;
} finally
{
jobLock.unlock();
}
}
@Override
public boolean isExecuting()
{
try
{
jobLock.lock();
return executing;
} finally
{
jobLock.unlock();
}
}
@Override
public boolean isCanceled()
{
try
{
jobLock.lock();
return canceled;
} finally
{
jobLock.unlock();
}
}
@Override
public void cancel()
{
try
{
jobLock.lock();
canceled = true;
if (!executing)
{
removeJob(this);
}
// Otherwise, it will be caught when the job finishes execution.
} finally
{
jobLock.unlock();
}
}
@Override
public String toString()
{
StringBuilder builder = new StringBuilder("PeriodicJob[#").append(jobId);
builder.append(", (").append(name).append(')');
if (executing)
{
builder.append(", executing");
}
if (canceled)
{
builder.append(", canceled");
} else
{
builder.append(String.format(", next execution %Tk:%<TM:%<TS+%<TL", nextExecution));
}
return builder.append(']').toString();
}
/**
* Starts execution of the job; this sets the executing flag, calculates the next execution time,
* and uses the ParallelExecutor to run the job.
*/
void start()
{
try
{
jobLock.lock();
executing = true;
// This is a bit naive; it assumes there will not be a delay waiting to execute. There's a lot of options
// here, such as basing the next execution on the actual start time, or event actual completion time, or allowing
// overlapping executions of the Job on a more rigid schedule. Use Quartz.
nextExecution = schedule.nextExecution(nextExecution);
parallelExecutor.invoke(this);
} finally
{
jobLock.unlock();
}
if (logger.isTraceEnabled())
{
logger.trace(this + " sent for execution");
}
}
void cleanupAfterExecution()
{
try
{
if (logger.isTraceEnabled())
{
logger.trace(this + " execution complete");
}
executing = false;
if (canceled)
{
removeJob(this);
} else
{
// Again, naive but necessary.
thread.interrupt();
}
} finally
{
jobLock.unlock();
}
}
@Override
public Void invoke()
{
logger.debug("Executing job #{} ({})", jobId, name);
try
{
runnableJob.run();
} finally
{
cleanupAfterExecution();
}
return null;
}
}
public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger logger)
{
this.parallelExecutor = parallelExecutor;
this.logger = logger;
}
@PostInjection
public void start(RegistryShutdownHub hub)
{
hub.addRegistryShutdownListener(new Runnable()
{
@Override
public void run()
{
registryDidShutdown();
}
});
}
public void init()
{
if (!started.get())
{
started.set(true);
thread.start();
}
}
void removeJob(Job job)
{
if (logger.isDebugEnabled())
{
logger.debug("Removing " + job);
}
try
{
jobLock.lock();
jobs.remove(job);
} finally
{
jobLock.unlock();
}
}
@Override
public PeriodicJob addJob(Schedule schedule, String name, Runnable job)
{
assert schedule != null;
assert name != null;
assert job != null;
Job periodicJob = new Job(schedule, name, job);
try
{
jobLock.lock();
jobs.add(periodicJob);
} finally
{
jobLock.unlock();
}
if (logger.isDebugEnabled())
{
logger.debug("Added " + periodicJob);
}
// Wake the thread so that it can start the job, if necessary.
// Technically, this is only necessary if the new job is scheduled earlier
// than any job currently in the list of jobs, but this naive implementation
// is simpler.
thread.interrupt();
return periodicJob;
}
@Override
public void run()
{
while (!shutdown)
{
long nextExecution = executeCurrentBatch();
try
{
long delay = nextExecution - System.currentTimeMillis();
if (logger.isTraceEnabled())
{
logger.trace(String.format("Sleeping for %,d ms", delay));
}
if (delay > 0)
{
Thread.sleep(delay);
}
} catch (InterruptedException
ex)
{
// Ignored; the thread is interrupted() to shut it down,
// or to have it execute a new batch.
logger.trace("Interrupted");
}
}
}
private void registryDidShutdown()
{
shutdown = true;
thread.interrupt();
}
/**
* Finds jobs and executes jobs that are ready to be executed.
*
* @return the next execution time (from the non-executing job that is scheduled earliest for execution).
*/
private long executeCurrentBatch()
{
long now = System.currentTimeMillis();
long nextExecution = now + FIVE_MINUTES;
try
{
jobLock.lock();
// TAP5-2455
Set<Job> jobsToCancel = null;
for (Job job : jobs)
{
if (job.isExecuting())
{
continue;
}
long jobNextExecution = job.getNextExecution();
if (jobNextExecution <= 0)
{
if (jobsToCancel == null)
{
jobsToCancel = new HashSet<PeriodicExecutorImpl.Job>();
}
jobsToCancel.add(job);
} else if (jobNextExecution <= now)
{
job.start();
} else
{
nextExecution = Math.min(nextExecution, jobNextExecution);
}
}
if (jobsToCancel != null)
{
for (Job job : jobsToCancel)
{
job.cancel();
}
}
} finally
{
jobLock.unlock();
}
return nextExecution;
}
}