blob: 64654299f92350c24f23e7d9d1365202a0f45ef1 [file] [log] [blame]
/* $Id$
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.etch.util;
* A standalone verion of a processor for todo items.
public class TodoManager extends AbstractStartable implements Runnable
* Constructs the TodoManager.
* @param maxEntries the maximum number of entries in the queue before
* the caller of addEntry is delayed.
* @param entryDelay milliseconds to delay a caller who tries to
* add a entry over the limit.
* @param minWorkers the minimum number of workers to keep waiting.
* @param maxWorkers the maxumum number of workers to allow.
* @param workerLinger milliseconds a worker will wait for a Todo
* before considering quitting.
* @param threshold the per worker threshold for queue length. If
* queue length exceeds this amount, a new worker is added if allowed.
public TodoManager( int maxEntries, int entryDelay, int minWorkers,
int maxWorkers, int workerLinger, int threshold )
if (maxEntries < 1)
throw new IllegalArgumentException( "maxEntries < 1" );
if (entryDelay < 1)
throw new IllegalArgumentException( "entryDelay < 1" );
if (minWorkers < 0)
throw new IllegalArgumentException( "minWorkers < 0" );
if (maxWorkers < 1)
throw new IllegalArgumentException( "maxWorkers < 1" );
if (maxWorkers < minWorkers)
throw new IllegalArgumentException( "maxWorkers < minWorkers" );
if (workerLinger < 1)
throw new IllegalArgumentException( "workerLinger < 1" );
if (threshold < 0)
throw new IllegalArgumentException( "threshold < 0" );
this.maxEntries = maxEntries;
this.entryDelay = entryDelay;
this.minWorkers = minWorkers;
this.maxWorkers = maxWorkers;
this.workerLinger = workerLinger;
this.threshold = threshold;
private final int maxEntries;
private final int entryDelay;
private final int minWorkers;
private final int maxWorkers;
private final int workerLinger;
private final int threshold;
protected void start0() throws Exception
// nothing to do.
protected void stop0() throws Exception
// nothing to do.
* @param todo
* @throws InterruptedException
public synchronized void add( Todo todo ) throws InterruptedException
int n = addEntry( todo );
considerStartingAWorker( n );
if (n > maxEntries)
Thread.sleep( entryDelay );
public void run()
boolean needsAdjust = true;
Todo todo;
while ((todo = getNextTodo()) != null)
todo.doit( this );
catch ( Exception e )
todo.exception( this, e );
needsAdjust = false;
if (needsAdjust)
workers.adjust( -1 );
* @return the number of workers.
public int numWorkers()
return workers.get();
private synchronized void considerStartingAWorker( int qlen )
int n = numWorkers();
if (n >= maxWorkers)
// start a new worker if there are none or if the queue
// length per worker has exceeded the threshold.
if (n == 0 || (qlen + n - 1) / n > threshold)
private void startAWorker()
workers.adjust( 1 );
new Thread( this ).start();
private final IntCounter workers = new IntCounter();
// QUEUE //
* Adds the todo to the tail of the queue.
* @param todo the todo to add.
* @return the current queue length.
private synchronized int addEntry( Todo todo )
Entry e = new Entry();
e.todo = todo;
if (tail != null) = e;
head = e;
tail = e;
return entries.adjust( 1 );
* @return a todo from the head of the queue, or null if the
* queue is empty.
private synchronized Todo removeEntry()
if (head == null)
return null;
Entry e = head;
head =;
if (head == null)
tail = null;
entries.adjust( -1 );
return e.todo;
* @return the number of todos.
public int numEntries()
return entries.get();
private Entry head;
private Entry tail;
private final IntCounter entries = new IntCounter();
/** An entry in the todo queue */
public static class Entry
/** the todo to be performed. */
public Todo todo;
/** the next todo in the queue. */
public Entry next;
// BLAH //
private synchronized Todo getNextTodo()
Todo todo = null;
boolean lingered = false;
while (isStarted() && (todo = removeEntry()) == null)
if (lingered && workers.get() > minWorkers)
workers.adjust( -1 );
return null;
wait( workerLinger );
// we lingered. we might have been woken because
// we're stopping, or a todo might have been
// queued.
lingered = true;
catch ( InterruptedException e )
workers.adjust( -1 );
return null;
return todo;
* @param todo
public static void addTodo( Todo todo )
getTodoManager().add( todo );
catch ( Exception e )
todo.exception( null, e );
* @return the configured todo manager. if there isn't one, it makes
* one with one worker thread.
* @throws Exception if there is a problem creating the todo manager.
public static TodoManager getTodoManager() throws Exception
if (todoManager == null)
synchronized (TodoManager.class)
if (todoManager == null)
todoManager = new TodoManager( 50, 10, 0, 5, 5000, 0 );
return todoManager;
* @param newTodoManager
* @return the old todo manager.
public static synchronized TodoManager setTodoManager( TodoManager newTodoManager )
TodoManager oldTodoManager = todoManager;
todoManager = newTodoManager;
return oldTodoManager;
* Shuts down the currently configured static todo manager if any.
* @throws Exception
public static void shutdown() throws Exception
TodoManager oldTodoManager = setTodoManager( null );
if (oldTodoManager != null)
private static TodoManager todoManager;