blob: 08eb627007e908ea8534b778f66b2310198f7eb4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.zest.library.scheduler.internal;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.zest.api.concern.Concerns;
import org.apache.zest.api.configuration.Configuration;
import org.apache.zest.api.injection.scope.Structure;
import org.apache.zest.api.injection.scope.This;
import org.apache.zest.api.mixin.Mixins;
import org.apache.zest.api.structure.Module;
import org.apache.zest.api.unitofwork.NoSuchEntityException;
import org.apache.zest.api.unitofwork.UnitOfWork;
import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern;
import org.apache.zest.library.scheduler.Schedule;
import org.apache.zest.library.scheduler.Scheduler;
import org.apache.zest.library.scheduler.SchedulerConfiguration;
/**
* This composite handles the Execution of Schedules.
*
* The composite is internal and should never be used by clients.
*/
@Mixins( Execution.ExecutionMixin.class )
public interface Execution
{
void dispatchForExecution( Schedule schedule );
void start()
throws Exception;
void stop()
throws Exception;
void updateNextTime( ScheduleTime schedule ); // This method is public, only because the UnitOfWorkConcern is wanted.
class ExecutionMixin
implements Execution, Runnable
{
public static final ThreadGroup TG = new ThreadGroup( "Zest Scheduling" );
private final Object lock = new Object();
@Structure
private Module module;
@This
private Scheduler scheduler;
@This
private Configuration<SchedulerConfiguration> config;
@This
private ThreadFactory threadFactory;
@This
private RejectedExecutionHandler rejectionHandler;
private final SortedSet<ScheduleTime> timingQueue = new TreeSet<>();
private volatile boolean running;
private ThreadPoolExecutor taskExecutor;
private volatile Thread scheduleThread;
@Override
public void run()
{
running = true;
while( running )
{
try
{
ScheduleTime scheduleTime = timing();
if( scheduleTime != null )
{
waitFor( scheduleTime );
if( isTime( scheduleTime ) ) // We might have been awakened to reschedule
{
updateNextTime( scheduleTime );
}
}
else
{
waitFor( 100 );
}
}
catch( Throwable e )
{
e.printStackTrace();
}
}
}
private ScheduleTime timing()
{
synchronized( lock )
{
if( timingQueue.size() == 0 )
{
return null;
}
return timingQueue.first();
}
}
private boolean isTime( ScheduleTime scheduleTime )
{
long now = System.currentTimeMillis();
return scheduleTime.nextTime() <= now;
}
private void waitFor( ScheduleTime scheduleTime )
throws InterruptedException
{
long now = System.currentTimeMillis();
long waitingTime = scheduleTime.nextTime() - now;
waitFor( waitingTime );
}
private void waitFor( long waitingTime )
{
if( waitingTime > 0 )
{
synchronized( lock )
{
try
{
lock.wait( waitingTime );
}
catch( InterruptedException e )
{
// should be ignored.
}
}
}
}
@Override
public void updateNextTime( ScheduleTime oldScheduleTime )
{
long now = System.currentTimeMillis();
try (UnitOfWork uow = module.newUnitOfWork()) // This will discard() the UoW when block is exited. We are only doing reads, so fine.
{
submitTaskForExecution( oldScheduleTime );
Schedule schedule = uow.get( Schedule.class, oldScheduleTime.scheduleIdentity() );
long nextTime = schedule.nextRun( now + 1000 );
if( nextTime != Long.MIN_VALUE )
{
ScheduleTime newScheduleTime = new ScheduleTime( oldScheduleTime.scheduleIdentity(), nextTime );
synchronized( lock )
{
// Re-add to the Timing Queue, to re-position the sorting.
timingQueue.remove( oldScheduleTime );
timingQueue.add( newScheduleTime );
}
}
else
{
synchronized( lock )
{
timingQueue.remove( oldScheduleTime );
}
}
}
catch( NoSuchEntityException e )
{
e.printStackTrace();
// scheduler.cancelSchedule( oldScheduleTime.scheduleIdentity() );
}
}
private void submitTaskForExecution( ScheduleTime scheduleTime )
{
Runnable taskRunner = module.newTransient( Runnable.class, scheduleTime );
this.taskExecutor.submit( taskRunner );
}
public void dispatchForExecution( Schedule schedule )
{
long now = System.currentTimeMillis();
long nextRun = schedule.nextRun( now + 1000 );
if( nextRun > 0 )
{
synchronized( lock )
{
timingQueue.add( new ScheduleTime( schedule.identity().get(), nextRun ) );
lock.notifyAll();
}
}
}
@Override
public void start()
throws Exception
{
SchedulerConfiguration configuration = config.get();
Integer workersCount = configuration.workersCount().get();
Integer workQueueSize = configuration.workQueueSize().get();
createThreadPoolExecutor( workersCount, workQueueSize );
taskExecutor.prestartAllCoreThreads();
SecurityManager sm = System.getSecurityManager();
ThreadGroup threadGroup = sm != null ? sm.getThreadGroup() : TG;
scheduleThread = new Thread( threadGroup, this, "Scheduler" );
scheduleThread.start();
}
private void createThreadPoolExecutor( Integer workersCount, Integer workQueueSize )
{
int corePoolSize = 2;
if( workersCount > 4 )
{
corePoolSize = workersCount / 4 + 1;
}
if( corePoolSize > 50 )
{
corePoolSize = 20;
}
if( workersCount > 200 )
{
workersCount = 200;
}
taskExecutor = new ThreadPoolExecutor( corePoolSize, workersCount,
0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>( workQueueSize ),
threadFactory, rejectionHandler );
}
@Override
public void stop()
throws Exception
{
running = false;
synchronized( this )
{
scheduleThread.interrupt();
}
taskExecutor.shutdown();
try
{
taskExecutor.awaitTermination( 5, TimeUnit.SECONDS );
}
catch( InterruptedException e )
{
e.printStackTrace();
}
taskExecutor.shutdownNow();
}
}
}