blob: cf9ed2377ce9fadb3451cf7897d5b97b43b4760a [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.etch.util.core.nio;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import org.apache.etch.util.AlarmListener;
import org.apache.etch.util.AlarmManager;
import org.apache.etch.util.Todo;
import org.apache.etch.util.TodoManager;
/**
* A shared pool of buffers.
*/
public class ByteBufferPool implements AlarmListener
{
/**
* Constructs a ByteBufferPool.
* @param bufferSize the size of each individual buffer.
* @param min the minimum number of buffers to keep in the pool.
* @param limit the maximum number of buffers to allow.
* @param interval the number of milliseconds between history tickle.
* @param length the length of history to maintain.
*/
public ByteBufferPool( int bufferSize, int min, int limit,
int interval, int length )
{
if (bufferSize <= 0)
throw new IllegalArgumentException( "bufferSize <= 0" );
if (interval < 0)
throw new IllegalArgumentException( "interval < 0" );
if (interval == 0 || length == 0)
{
// if either is 0, force both to 0.
interval = 0;
length = 0;
}
this.bufferSize = bufferSize;
this.interval = interval;
history = new History( min, limit, length );
if (interval > 0)
AlarmManager.staticAdd( this, null, interval );
}
/** The size of buffers to allocate. */
private final int bufferSize;
/** Time in milliseconds between history tickles. */
private final int interval;
private final History history;
/**
* @return the size of the buffers being managed.
*/
public int bufferSize()
{
return bufferSize;
}
/**
* @return the minimum number of buffers to maintain.
*/
public int min()
{
return history.min();
}
/**
* @return the maximum number of buffers to allocate.
*/
public int limit()
{
return history.limit();
}
/**
* @return the history tickle interval in ms.
*/
public int interval()
{
return interval;
}
/**
* @return the history tickle length.
*/
public int length()
{
return history.length();
}
/**
* Allocates a buffer from the pool.
*
* @param notify object to be notified when a buffer is released into the
* pool.
* @return a buffer from the pool or null if the pool is empty and notify is
* not null.
* @throws IOException the pool is empty and notify is null.
*/
public ByteBuffer alloc( Notify notify ) throws IOException
{
synchronized (saved)
{
if (!history.used( 1 ))
{
if (notify == null)
throw new IOException( "out of buffers" );
// notify != null
register( notify );
return null;
}
try
{
return allocBuf();
}
catch ( Error e )
{
history.used( -1 );
throw e;
}
catch ( RuntimeException e )
{
history.used( -1 );
throw e;
}
}
}
/**
* Allocates some buffers from the pool.
*
* @param notify object to be notified when a buffer is released into the
* pool.
* @param count the number of buffers required.
* @return an array of allocated buffers.
* @throws IOException
*/
public ByteBuffer[] alloc( Notify notify, int count ) throws IOException
{
if (count <= 0)
throw new IllegalArgumentException( "count <= 0" );
if (count > history.limit())
throw new IllegalArgumentException( "count > limit" );
synchronized (saved)
{
if (!history.used( count ))
{
if (notify == null)
throw new IOException( "out of buffers" );
// notify != null
register( notify );
return null;
}
ByteBuffer[] bufs = new ByteBuffer[count];
try
{
for (int i = 0; i < count; i++)
bufs[i] = allocBuf();
}
catch ( Error e )
{
release( bufs );
throw e;
}
catch ( RuntimeException e )
{
release( bufs );
throw e;
}
return bufs;
}
}
/**
* Releases a buffer back into the pool. If there are waiters for a buffer
* being released into the pool, they are notified.
*
* @param buf
*/
public void release( ByteBuffer buf )
{
synchronized (saved)
{
releaseBuf( buf );
notifyWaiters();
}
}
/**
* Releases some buffers back into the pool. If there are waiters for a
* buffer being released into the pool, they are notified.
*
* @param bufs
*/
public void release( ByteBuffer[] bufs )
{
if (bufs == null || bufs.length == 0)
return;
synchronized (saved)
{
for (ByteBuffer buf : bufs)
releaseBuf( buf );
notifyWaiters();
}
}
private ByteBuffer allocBuf()
{
// used has already been incremented.
// we need a buffer, get a saved one if any...
if (!saved.isEmpty())
{
ByteBuffer buf = saved.remove( saved.size() - 1 );
buf.clear();
return buf;
}
ByteBuffer buf = ByteBuffer.allocateDirect( bufferSize );
history.alloc( 1 );
return buf;
}
private void releaseBuf( ByteBuffer buf )
{
if (buf == null)
return;
history.used( -1 );
// save the buffer if there aren't already too many, drop the rest.
if (saved.size() < history.suggested())
saved.add( buf );
else
history.alloc( -1 );
}
private void trimSaved()
{
synchronized (saved)
{
int n = history.suggested();
while (saved.size() > n)
{
saved.remove( saved.size()-1 );
history.alloc( -1 );
}
}
}
private final List<ByteBuffer> saved = new ArrayList<ByteBuffer>();
/**
* Registers for notification of available buffer. Note that when you are
* notified, your registration is canceled. You can always re-register in
* the bufferAvailable method.
*
* @param notify
*/
public void register( Notify notify )
{
synchronized (waiters)
{
waiters.add( notify );
}
}
/**
* Removes a registration for notification of available buffer.
*
* @param notify
*/
public void unregister( Notify notify )
{
synchronized (waiters)
{
waiters.remove( notify );
}
}
private void notifyWaiters()
{
final Notify[] notifies;
synchronized (waiters)
{
int n = waiters.size();
if (n == 0)
return;
notifies = waiters.toArray( new Notify[n] );
waiters.clear();
}
TodoManager.addTodo( new Todo()
{
public void doit( TodoManager mgr ) throws Exception
{
for (Notify n : notifies)
n.bufferAvailable( ByteBufferPool.this );
}
public void exception( TodoManager mgr, Exception e )
{
e.printStackTrace();
}
} );
}
private final List<Notify> waiters = new LinkedList<Notify>();
public int wakeup( AlarmManager manager, Object state, long due )
{
history.tickle();
trimSaved();
return interval;
}
/**
* Called when this ByteBufferPool is no longer active. Removes the history
* tickle.
*/
public void shutdown()
{
AlarmManager.staticRemove( this );
}
/**
* Interface to notify interested parties about available buffers.
*/
public interface Notify
{
/**
* Notifies that the pool has an available buffer.
*
* @param pool
*/
public void bufferAvailable( ByteBufferPool pool );
}
/**
* @return the count of buffers currently allocated.
*/
public int used()
{
return history.getUsed();
}
}