blob: bdd27f2d1c4f0375bed3296f55642069f3a206ae [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.transport;
import java.util.ArrayList;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoSession;
import org.apache.mina.util.BlockingQueue;
import org.apache.mina.util.ByteBufferUtil;
import org.apache.mina.util.IdentityHashSet;
import org.apache.mina.util.Queue;
import org.apache.mina.util.Stack;
/**
* A Thread-pooling filter. This filter forwards {@link IoHandler} events
* to its thread pool.
* <p/>
* This is an implementation of
* <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
* thread pool</a> by Douglas C. Schmidt et al.
*/
public class ThreadPoolFilter extends IoFilterAdapter
{
/**
* Default maximum size of thread pool (2G).
*/
public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;
/**
* Default keep-alive time of thread pool (1 min).
*/
public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;
/**
* A queue which contains {@link Integer}s which represents reusable
* thread IDs. {@link Worker} first checks this queue and then
* uses {@link #threadId} when no reusable thread ID is available.
*/
private static final Queue threadIdReuseQueue = new Queue();
private static int threadId = 0;
private static int acquireThreadId()
{
synchronized (threadIdReuseQueue)
{
Integer id = (Integer) threadIdReuseQueue.pop();
if (id == null)
{
return ++ threadId;
}
else
{
return id.intValue();
}
}
}
private static void releaseThreadId(int id)
{
synchronized (threadIdReuseQueue)
{
threadIdReuseQueue.push(new Integer(id));
}
}
private final String threadNamePrefix;
private final Map buffers = new IdentityHashMap();
private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
private final Set allSessionBuffers = new IdentityHashSet();
private Worker leader;
private final Stack followers = new Stack();
private final Set allWorkers = new IdentityHashSet();
private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;
private boolean shuttingDown;
private int poolSize;
private final Object poolSizeLock = new Object();
/**
* Creates a new instance of this filter with default thread pool settings.
*/
public ThreadPoolFilter()
{
this("IoThreadPool");
}
/**
* Creates a new instance of this filter with the specified thread name prefix
* and other default settings.
*
* @param threadNamePrefix the prefix of the thread names this pool will create.
*/
public ThreadPoolFilter(String threadNamePrefix)
{
if (threadNamePrefix == null)
{
throw new NullPointerException("threadNamePrefix");
}
threadNamePrefix = threadNamePrefix.trim();
if (threadNamePrefix.length() == 0)
{
throw new IllegalArgumentException("threadNamePrefix is empty.");
}
this.threadNamePrefix = threadNamePrefix;
}
public String getThreadNamePrefix()
{
return threadNamePrefix;
}
public int getPoolSize()
{
synchronized (poolSizeLock)
{
return poolSize;
}
}
public int getMaximumPoolSize()
{
return maximumPoolSize;
}
public int getKeepAliveTime()
{
return keepAliveTime;
}
public void setMaximumPoolSize(int maximumPoolSize)
{
if (maximumPoolSize <= 0)
{
throw new IllegalArgumentException();
}
this.maximumPoolSize = maximumPoolSize;
}
public void setKeepAliveTime(int keepAliveTime)
{
this.keepAliveTime = keepAliveTime;
}
public void init()
{
shuttingDown = false;
leader = new Worker();
leader.start();
leader.lead();
}
public void destroy()
{
shuttingDown = true;
int expectedPoolSize = 0;
while (getPoolSize() != expectedPoolSize)
{
List allWorkers;
synchronized (poolSizeLock)
{
allWorkers = new ArrayList(this.allWorkers);
}
// You may not interrupt the current thread.
if (allWorkers.remove(Thread.currentThread()))
{
expectedPoolSize = 1;
}
for (Iterator i = allWorkers.iterator(); i.hasNext();)
{
Worker worker = (Worker) i.next();
while (worker.isAlive())
{
worker.interrupt();
try
{
// This timeout will help us from
// infinite lock-up and interrupt workers again.
worker.join(100);
}
catch (InterruptedException e)
{
}
}
}
}
this.allSessionBuffers.clear();
this.unfetchedSessionBuffers.clear();
this.buffers.clear();
this.followers.clear();
this.leader = null;
}
private void increasePoolSize(Worker worker)
{
synchronized (poolSizeLock)
{
poolSize++;
allWorkers.add(worker);
}
}
private void decreasePoolSize(Worker worker)
{
synchronized (poolSizeLock)
{
poolSize--;
allWorkers.remove(worker);
}
}
private void fireEvent(NextFilter nextFilter, IoSession session,
EventType type, Object data)
{
final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
final Set allSessionBuffers = this.allSessionBuffers;
final Event event = new Event(type, nextFilter, data);
synchronized (unfetchedSessionBuffers)
{
final SessionBuffer buf = getSessionBuffer(session);
final Queue eventQueue = buf.eventQueue;
synchronized (buf)
{
eventQueue.push(event);
}
if (!allSessionBuffers.contains(buf))
{
allSessionBuffers.add(buf);
unfetchedSessionBuffers.push(buf);
}
}
}
/**
* Implement this method to fetch (or pop) a {@link SessionBuffer} from
* the given <tt>unfetchedSessionBuffers</tt>. The default implementation
* simply pops the buffer from it. You could prioritize the fetch order.
*
* @return A non-null {@link SessionBuffer}
*/
protected SessionBuffer fetchSessionBuffer(Queue unfetchedSessionBuffers)
{
return (SessionBuffer) unfetchedSessionBuffers.pop();
}
private SessionBuffer getSessionBuffer(IoSession session)
{
final Map buffers = this.buffers;
SessionBuffer buf = (SessionBuffer) buffers.get(session);
if (buf == null)
{
synchronized (buffers)
{
buf = (SessionBuffer) buffers.get(session);
if (buf == null)
{
buf = new SessionBuffer(session);
buffers.put(session, buf);
}
}
}
return buf;
}
private void removeSessionBuffer(SessionBuffer buf)
{
final Map buffers = this.buffers;
final IoSession session = buf.session;
synchronized (buffers)
{
buffers.remove(session);
}
}
protected static class SessionBuffer
{
private final IoSession session;
private final Queue eventQueue = new Queue();
private SessionBuffer(IoSession session)
{
this.session = session;
}
public IoSession getSession()
{
return session;
}
public Queue getEventQueue()
{
return eventQueue;
}
}
private class Worker extends Thread
{
private final int id;
private final Object promotionLock = new Object();
private boolean dead;
private Worker()
{
int id = acquireThreadId();
this.id = id;
this.setName(threadNamePrefix + '-' + id);
increasePoolSize(this);
}
public boolean lead()
{
final Object promotionLock = this.promotionLock;
synchronized (promotionLock)
{
if (dead)
{
return false;
}
leader = this;
promotionLock.notify();
}
return true;
}
public void run()
{
for (; ;)
{
if (!waitForPromotion())
{
break;
}
SessionBuffer buf = fetchBuffer();
giveUpLead();
if (buf == null)
{
break;
}
processEvents(buf);
follow();
releaseBuffer(buf);
}
decreasePoolSize(this);
releaseThreadId(id);
}
private SessionBuffer fetchBuffer()
{
BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
synchronized (unfetchedSessionBuffers)
{
while (!shuttingDown)
{
try
{
unfetchedSessionBuffers.waitForNewItem();
}
catch (InterruptedException e)
{
continue;
}
return ThreadPoolFilter.this.fetchSessionBuffer(unfetchedSessionBuffers);
}
}
return null;
}
private void processEvents(SessionBuffer buf)
{
final IoSession session = buf.session;
final Queue eventQueue = buf.eventQueue;
for (; ;)
{
Event event;
synchronized (buf)
{
event = (Event) eventQueue.pop();
if (event == null)
{
break;
}
}
processEvent(event.getNextFilter(), session,
event.getType(), event.getData());
}
}
private void follow()
{
final Object promotionLock = this.promotionLock;
final Stack followers = ThreadPoolFilter.this.followers;
synchronized (promotionLock)
{
if (this != leader)
{
synchronized (followers)
{
followers.push(this);
}
}
}
}
private void releaseBuffer(SessionBuffer buf)
{
final BlockingQueue unfetchedSessionBuffers = ThreadPoolFilter.this.unfetchedSessionBuffers;
final Set allSessionBuffers = ThreadPoolFilter.this.allSessionBuffers;
final Queue eventQueue = buf.eventQueue;
synchronized (unfetchedSessionBuffers)
{
if (eventQueue.isEmpty())
{
allSessionBuffers.remove(buf);
removeSessionBuffer(buf);
}
else
{
unfetchedSessionBuffers.push(buf);
}
}
}
private boolean waitForPromotion()
{
final Object promotionLock = this.promotionLock;
long startTime = System.currentTimeMillis();
long currentTime = System.currentTimeMillis();
synchronized (promotionLock)
{
while (this != leader && !shuttingDown)
{
// Calculate remaining keep-alive time
int keepAliveTime = getKeepAliveTime();
if (keepAliveTime > 0)
{
keepAliveTime -= (currentTime - startTime);
}
else
{
keepAliveTime = Integer.MAX_VALUE;
}
// Break the loop if there's no remaining keep-alive time.
if (keepAliveTime <= 0)
{
break;
}
// Wait for promotion
try
{
promotionLock.wait(keepAliveTime);
}
catch (InterruptedException e)
{
}
// Update currentTime for the next iteration
currentTime = System.currentTimeMillis();
}
boolean timeToLead = this == leader && !shuttingDown;
if (!timeToLead)
{
// time to die
synchronized (followers)
{
followers.remove(this);
}
// Mark as dead explicitly when we've got promotionLock.
dead = true;
}
return timeToLead;
}
}
private void giveUpLead()
{
final Stack followers = ThreadPoolFilter.this.followers;
Worker worker;
do
{
synchronized (followers)
{
worker = (Worker) followers.pop();
}
if (worker == null)
{
// Increase the number of threads if we
// are not shutting down and we can increase the number.
if (!shuttingDown
&& getPoolSize() < getMaximumPoolSize())
{
worker = new Worker();
worker.lead();
worker.start();
}
// This loop should end because:
// 1) lead() is called already,
// 2) or it is shutting down and there's no more threads left.
break;
}
}
while (!worker.lead());
}
}
protected static class EventType
{
public static final EventType OPENED = new EventType("OPENED");
public static final EventType CLOSED = new EventType("CLOSED");
public static final EventType READ = new EventType("READ");
public static final EventType WRITTEN = new EventType("WRITTEN");
public static final EventType RECEIVED = new EventType("RECEIVED");
public static final EventType SENT = new EventType("SENT");
public static final EventType IDLE = new EventType("IDLE");
public static final EventType EXCEPTION = new EventType("EXCEPTION");
private final String value;
private EventType(String value)
{
this.value = value;
}
public String toString()
{
return value;
}
}
protected static class Event
{
private final EventType type;
private final NextFilter nextFilter;
private final Object data;
public Event(EventType type, NextFilter nextFilter, Object data)
{
this.type = type;
this.nextFilter = nextFilter;
this.data = data;
}
public Object getData()
{
return data;
}
public NextFilter getNextFilter()
{
return nextFilter;
}
public EventType getType()
{
return type;
}
}
public void sessionCreated(NextFilter nextFilter, IoSession session)
{
nextFilter.sessionCreated(session);
}
public void sessionOpened(NextFilter nextFilter,
IoSession session)
{
fireEvent(nextFilter, session, EventType.OPENED, null);
}
public void sessionClosed(NextFilter nextFilter,
IoSession session)
{
fireEvent(nextFilter, session, EventType.CLOSED, null);
}
public void sessionIdle(NextFilter nextFilter,
IoSession session, IdleStatus status)
{
fireEvent(nextFilter, session, EventType.IDLE, status);
}
public void exceptionCaught(NextFilter nextFilter,
IoSession session, Throwable cause)
{
fireEvent(nextFilter, session, EventType.EXCEPTION, cause);
}
public void messageReceived(NextFilter nextFilter,
IoSession session, Object message)
{
ByteBufferUtil.acquireIfPossible(message);
fireEvent(nextFilter, session, EventType.RECEIVED, message);
}
public void messageSent(NextFilter nextFilter,
IoSession session, Object message)
{
ByteBufferUtil.acquireIfPossible(message);
fireEvent(nextFilter, session, EventType.SENT, message);
}
protected void processEvent(NextFilter nextFilter,
IoSession session, EventType type,
Object data)
{
if (type == EventType.RECEIVED)
{
nextFilter.messageReceived(session, data);
ByteBufferUtil.releaseIfPossible(data);
}
else if (type == EventType.SENT)
{
nextFilter.messageSent(session, data);
ByteBufferUtil.releaseIfPossible(data);
}
else if (type == EventType.EXCEPTION)
{
nextFilter.exceptionCaught(session, (Throwable) data);
}
else if (type == EventType.IDLE)
{
nextFilter.sessionIdle(session, (IdleStatus) data);
}
else if (type == EventType.OPENED)
{
nextFilter.sessionOpened(session);
}
else if (type == EventType.CLOSED)
{
nextFilter.sessionClosed(session);
}
}
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest)
{
nextFilter.filterWrite(session, writeRequest);
}
public void filterClose(NextFilter nextFilter, IoSession session) throws Exception
{
nextFilter.filterClose(session);
}
}