blob: 1382e8b675743291f6fd931b57e059a5d462e973 [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.etch.util.core.nio;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.util.LinkedList;
import org.apache.etch.util.Assertion;
/**
* An experiment in selection.
*/
public class Selector extends Thread
{
/**
* Maximum number of keys allowed in this Selector.
*/
public static final int MAX_KEYS = 64;
/**
* Constructs the selector.
* @param id a unique id for this selector.
* @param ss the containing super selector.
* @throws IOException
*/
public Selector( int id, SuperSelector ss ) throws IOException
{
this.id = id;
this.ss = ss;
selector = java.nio.channels.Selector.open();
ssAdd();
}
private final int id;
private final SuperSelector ss;
private final java.nio.channels.Selector selector;
/**
* @return the unique id of this selector.
*/
public int id()
{
return id;
}
@Override
public String toString()
{
return "Selector."+id;
}
////////////
// THREAD //
////////////
@Override
public void run()
{
try
{
while (selector.isOpen())
{
runActions();
int n;
while ( true )
{
try
{
n = selector.select();
break;
}
catch( IOException e )
{
// Workaround, see sun bug 6693490
//System.out.println("****** caught IOException");
if ( e.getMessage() != null && e.getMessage().indexOf("File exists") != -1 )
continue;
throw e;
}
}
// Log.report( "selected", "selector", this, "n", n );
if (n > 0)
{
for (SelectionKey key : selector.selectedKeys())
handleSelection( key );
selector.selectedKeys().clear();
}
}
}
catch ( ClosedSelectorException e )
{
// ignore.
}
catch ( Exception e )
{
reportException( e );
}
finally
{
ss.remove( this );
try
{
// TODO remove and cancel all the keys
selector.close();
}
catch ( Exception e )
{
reportException( e );
}
}
}
private void reportException( Exception e )
{
System.err.printf( "%s: caught exception: %s\n", this, e );
e.printStackTrace();
}
//////////////////
// SAFE ACTIONS //
//////////////////
/**
* Schedules a registration of the handler with this Selector if it
* isn't full, creates a new Selector if this Selector is full.
* @param handler
*/
public void register( final Handler<?> handler )
{
checkNotShutdown();
enqueueAction( false, new Action()
{
public void run()
{
try
{
actionRegister( handler );
}
catch ( Exception e )
{
try
{
actionCancel( handler, e );
}
catch ( Exception e1 )
{
handler.reportException( e1 );
}
}
}
} );
}
private void checkNotShutdown()
{
if (shutdown)
throw new IllegalStateException( "shutdown" );
}
/**
* Schedules an update to the handler's interest ops with this Selector.
* @param handler
*/
public void updateInterestOps( final Handler<?> handler )
{
enqueueAction( false, new Action()
{
public void run()
{
try
{
actionUpdateInterestOps( handler );
}
catch ( Exception e )
{
try
{
actionCancel( handler, e );
}
catch ( Exception e1 )
{
handler.reportException( e1 );
}
}
}
} );
}
/**
* Schedules a cancel of the handler with this Selector.
* @param handler
*/
public void cancel( final Handler<?> handler )
{
enqueueAction( true, new Action()
{
public void run()
{
try
{
actionCancel( handler, null );
}
catch ( Exception e )
{
handler.reportException( e );
}
}
} );
}
/**
* Shuts down the selector.
*/
public void shutdown()
{
if (shutdown)
return;
shutdown = true;
enqueueAction( false, new Action()
{
public void run()
{
try
{
actionShutdown();
}
catch ( Exception e )
{
reportException( e );
}
}
} );
}
private boolean shutdown;
////////////////////
// UNSAFE ACTIONS //
////////////////////
private void actionRegister( Handler<?> handler ) throws Exception
{
// this action is called from the selector thread.
int n = selector.keys().size();
if (n >= MAX_KEYS)
{
ss.register( handler );
return;
}
int ops = handler.getInterestOps();
Assertion.check( !handler.channel().isRegistered(), "!handler.channel().isRegistered()" );
Assertion.check( handler.key() == null, "handler.key() == null" );
SelectionKey key = handler.channel().register( selector, ops, handler );
handler.setLastInterestOps( ops );
handler.registered( this, key );
}
private void actionUpdateInterestOps( Handler<?> handler ) throws Exception
{
// this action is called from the selector thread.
SelectionKey key = handler.key();
if (key != null)
{
int ops = handler.getInterestOps();
if (ops != handler.getLastInterestOps())
{
key.interestOps( ops );
handler.setLastInterestOps( ops );
}
}
}
private void actionCancel( Handler<?> handler, Exception e ) throws Exception
{
// this action is called from the selector thread.
handler.canceled( e );
}
private void actionShutdown() throws Exception
{
selector.close();
}
/////////////////
// ACTION LIST //
/////////////////
private void runActions()
{
Action action;
while ((action = dequeueAction()) != null)
action.run();
ssUpdate();
}
private void ssAdd()
{
size = selector.keys().size();
ss.add( this, MAX_KEYS-size );
}
private void ssUpdate()
{
size = selector.keys().size();
ss.update( this, MAX_KEYS-size );
}
private int size;
private void enqueueAction( boolean priority, Action action )
{
synchronized (actions)
{
boolean wasEmpty = actions.isEmpty();
if (priority)
actions.addFirst( action );
else
actions.addLast( action );
if (wasEmpty)
selector.wakeup();
}
}
private Action dequeueAction()
{
synchronized (actions)
{
return actions.poll();
}
}
private final LinkedList<Action> actions = new LinkedList<Action>();
//////////////////////
// HANDLE SELECTION //
//////////////////////
private void handleSelection( SelectionKey key )
{
Handler<?> handler = (Handler<?>) key.attachment();
if (handler == null)
{
try
{
key.channel().close();
}
catch ( IOException e )
{
reportException( e );
}
return;
}
try
{
handler.selected();
actionUpdateInterestOps( handler );
}
catch ( ClosedChannelException e )
{
// ignore
}
catch ( CancelledKeyException e )
{
// ignore
}
catch ( Exception e )
{
try
{
actionCancel( handler, e );
}
catch ( Exception e1 )
{
handler.reportException( e1 );
}
}
}
/**
* Performs an action on a handler, such as registering it with the
* selector, updating its interest ops, or closing it.
*/
public interface Action
{
/**
* Runs the action when the selector is available to control.
*/
public void run();
}
/**
* Dumps the handlers in this selector.
*/
public void dump()
{
for (SelectionKey k: selector.keys())
{
Object a = k.attachment();
if (a == null)
{
System.out.printf( "%s: channel %s\n", this, k.channel() );
continue;
}
System.out.printf( "%s: attachment %s\n", this, a );
}
}
}