Adding generics to task executor
diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutor.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutor.java
index 1d6e1b0..fffc2ff 100644
--- a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutor.java
+++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/TaskQueueExecutor.java
@@ -20,6 +20,7 @@
*/
import org.apache.archiva.components.taskqueue.Task;
+import org.apache.archiva.components.taskqueue.TaskQueue;
/**
@@ -43,4 +44,12 @@
* @return true if the task was cancelled, false if the task was not executing.
*/
boolean cancelTask( T task );
+
+ TaskQueue<T> getQueue( );
+
+ TaskExecutor<T> getExecutor( );
+
+ String getName( );
+
+ boolean hasRunningTask();
}
diff --git a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/ThreadedTaskQueueExecutor.java b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/ThreadedTaskQueueExecutor.java
index 05c50da..017258c 100644
--- a/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/ThreadedTaskQueueExecutor.java
+++ b/spring-taskqueue/src/main/java/org/apache/archiva/components/taskqueue/execution/ThreadedTaskQueueExecutor.java
@@ -21,6 +21,7 @@
import org.apache.archiva.components.taskqueue.Task;
import org.apache.archiva.components.taskqueue.TaskQueue;
+import org.apache.archiva.components.taskqueue.TaskQueueException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,13 +35,14 @@
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicReference;
/**
* @author <a href="mailto:trygvis@inamo.no">Trygve Laugstøl</a>
* @author <a href="mailto:kenney@codehaus.org">Kenney Westerhof</a>
*/
-public class ThreadedTaskQueueExecutor
- implements TaskQueueExecutor
+public class ThreadedTaskQueueExecutor<T extends Task>
+ implements TaskQueueExecutor<T>
{
private Logger logger = LoggerFactory.getLogger( getClass( ) );
@@ -52,12 +54,12 @@
/**
* requirement
*/
- private TaskQueue queue;
+ private TaskQueue<T> queue;
/**
* requirement
*/
- private TaskExecutor executor;
+ private TaskExecutor<T> executor;
/**
* configuration
@@ -72,7 +74,7 @@
private ExecutorService executorService;
- private Task currentTask;
+ private final AtomicReference<T> currentTask = new AtomicReference<>( );
private class ExecutorRunnable
extends Thread
@@ -86,56 +88,80 @@
{
while ( command != SHUTDOWN )
{
- final Task task;
+ final T task;
- currentTask = null;
-
- try
+ if ( currentTask.get( ) == null )
{
- task = queue.poll( 100, TimeUnit.MILLISECONDS );
- }
- catch ( InterruptedException e )
- {
- logger.info( "Executor thread interrupted, command: {}", ( command == SHUTDOWN
- ? "Shutdown"
- : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) );
- continue;
- }
- if ( task == null )
- {
- continue;
- }
+ try
+ {
+ task = queue.poll( 100, TimeUnit.MILLISECONDS );
+ }
+ catch ( InterruptedException e )
+ {
+ logger.info( "Executor thread interrupted, command: {}", ( command == SHUTDOWN
+ ? "Shutdown"
+ : command == CANCEL_TASK ? "Cancel task" : "Unknown" ) );
+ continue;
+ }
- currentTask = task;
+ if ( task == null )
+ {
+ continue;
+ }
- Future future = executorService.submit( new Runnable( )
- {
- @Override
- public void run( )
+ if ( currentTask.compareAndSet( null, task ) )
{
try
{
- executor.executeTask( task );
+
+ Future future = executorService.submit( new Runnable( )
+ {
+ @Override
+ public void run( )
+ {
+ try
+ {
+ executor.executeTask( task );
+ }
+ catch ( TaskExecutionException e )
+ {
+ logger.error( "Error executing task: {}", e.getMessage( ), e );
+ }
+ }
+ } );
+
+ try
+ {
+ waitForTask( task, future );
+ }
+ catch ( ExecutionException e )
+ {
+ logger.error( "Error while waiting for task: {}", e.getMessage( ), e );
+ }
}
- catch ( TaskExecutionException e )
+ finally
{
- logger.error( "Error executing task: {}", e.getMessage( ), e );
+ currentTask.set( null );
}
}
- } );
+ else
+ {
+ logger.error( "There was another task running! " );
+ try
+ {
+ queue.put( task );
+ }
+ catch ( TaskQueueException e )
+ {
+ logger.error( "Could not put task back to queue" );
+ }
+ }
- try
- {
- waitForTask( task, future );
- }
- catch ( ExecutionException e )
- {
- logger.error( "Error executing task: {}", e.getMessage( ), e );
}
}
- currentTask = null;
+ currentTask.set( null );
logger.info( "Executor thread '{}' exited.", name );
@@ -331,9 +357,9 @@
}
@Override
- public Task getCurrentTask( )
+ public T getCurrentTask( )
{
- return currentTask;
+ return currentTask.get( );
}
@Override
@@ -342,7 +368,8 @@
return executorRunnable.cancelTask( task );
}
- public TaskQueue getQueue( )
+ @Override
+ public TaskQueue<T> getQueue( )
{
return queue;
}
@@ -352,7 +379,8 @@
this.queue = queue;
}
- public TaskExecutor getExecutor( )
+ @Override
+ public TaskExecutor<T> getExecutor( )
{
return executor;
}
@@ -362,6 +390,7 @@
this.executor = executor;
}
+ @Override
public String getName( )
{
return name;
@@ -371,4 +400,10 @@
{
this.name = name;
}
+
+ @Override
+ public boolean hasRunningTask( )
+ {
+ return this.currentTask.get( ) != null;
+ }
}