blob: 5be921d16a9b3e203d03da76d926b3f1cb3ba9a9 [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.etch.util.core.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import org.apache.etch.util.Assertion;
/**
* Models the external api of a Selector while keeping a list of available
* selectors and their load.
*/
public class SuperSelector
{
/**
* Marks the SuperSelector as open for business.
*/
public void start()
{
checkStarted( false );
started = true;
}
private void checkStarted( boolean expected )
{
if (expected != started)
throw new IllegalStateException( expected ? "not started" : "started" );
}
private boolean started;
/**
* Shuts down the SuperSelector, closing all the existing selectors and
* handlers.
*/
public void shutdown()
{
started = false;
}
/**
* Adds the Selector to the set of available selectors, it is new created.
* @param selector
* @param avail the number of available slots.
*/
public void add( Selector selector, int avail )
{
checkStarted( true );
Key k = new Key( avail, selector );
if (!sel2keys.containsKey( selector ))
{
sel2keys.put( selector, k );
keysAdd( k );
}
}
private final Map<Selector, Key> sel2keys =
Collections.synchronizedMap( new HashMap<Selector, Key>() );
/**
* Updates the Selector's capacity in the set of available selectors.
* @param selector
* @param avail the number of available slots.
*/
public void update( Selector selector, int avail )
{
checkStarted( true );
Key k = sel2keys.get( selector );
Assertion.check( k != null, "k != null" );
synchronized (_keys)
{
if (avail != k.avail())
{
keysRemove( k );
k.setAvail( avail );
keysAdd( k );
}
}
}
/**
* Removes the Selector from the set of available selectors. It is dead.
* @param selector
*/
public void remove( Selector selector )
{
Key k = sel2keys.remove( selector );
if (k != null)
keysRemove( k );
}
private void keysAdd( Key k )
{
synchronized (_keys)
{
if (k.avail() > 0)
{
Key o = _keys.put( k, k );
// System.out.printf( "add %s => %s\n", k, o );
Assertion.check( o == null, "o == null" );
}
else
{
Assertion.check( _keys.get( k ) == null, "_keys.get( k ) == null" );
}
}
}
private void keysRemove( Key k )
{
synchronized (_keys)
{
Key o = _keys.remove( k );
// System.out.printf( "remove %s => %s\n", k, o );
Assertion.check( o == k || o == null, "o == k || o == null" );
}
}
private Selector getSelector() throws IOException
{
synchronized (_keys)
{
try
{
Key k = _keys.firstKey();
keysRemove( k );
k.setAvail( k.avail()-1 );
keysAdd( k );
return k.selector;
}
catch ( NoSuchElementException e )
{
Selector s = new Selector( _id++, this );
// constructor calls us back at #add(Selector, int)
s.start();
return s;
}
}
}
private final TreeMap<Key, Key> _keys = new TreeMap<Key, Key>();
private int _id;
/**
* Registers the handler with an available Selector. This may be called by
* an external thread or by one of the Selector threads.
* @param handler
* @throws IOException
*/
public void register( Handler<?> handler ) throws IOException
{
checkStarted( true );
getSelector().register( handler );
}
/**
* @param addr
* @param backlog
* @param factory
* @return a new AcceptHandler already registered with this selector.
* @throws Exception
*/
public AcceptHandler newAcceptHandler( InetSocketAddress addr, int backlog,
AcceptHandlerFactory factory ) throws Exception
{
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking( false );
ServerSocket ss = ssc.socket();
ss.bind( addr, backlog );
AcceptHandler ah = factory.newAcceptHandler( ssc );
register( ah );
return ah;
}
/**
* @param addr
* @param factory
* @return a new StreamHandler already registered with this selector.
* @throws Exception
*/
public StreamHandler newStreamHandler( InetSocketAddress addr,
StreamHandlerFactory factory ) throws Exception
{
SocketChannel sc = SocketChannel.open();
sc.configureBlocking( false );
StreamHandler sh = factory.newStreamHandler( sc, true );
register( sh );
sc.connect( addr );
return sh;
}
/**
* @param sc
* @param factory
* @return a new StreamHandler already registered with this selector.
* @throws Exception
*/
public StreamHandler newStreamHandler( SocketChannel sc,
StreamHandlerFactory factory ) throws Exception
{
sc.configureBlocking( false );
StreamHandler sh = factory.newStreamHandler( sc, false );
register( sh );
return sh;
}
private static class Key implements Comparable<Key>
{
public Key( int avail, Selector selector )
{
_avail = avail;
this.selector = selector;
}
private int _avail;
private final Selector selector;
@Override
public String toString()
{
return String.format( "Key(%d, %s)", _avail, selector );
}
public int avail()
{
return _avail;
}
public void setAvail( int avail )
{
_avail = avail;
}
@Override
public int hashCode()
{
return _avail ^ selector.hashCode();
}
@Override
public boolean equals( Object obj )
{
if (obj == this)
return true;
if (obj == null || obj.getClass() != Key.class)
return false;
Key k = (Key) obj;
return _avail == k._avail && selector == k.selector;
}
public int compareTo( Key k )
{
if (_avail < k._avail)
return -1;
if (_avail > k._avail)
return 1;
// avail == k.avail
if (selector.id() < k.selector.id())
return -1;
if (selector.id() > k.selector.id())
return 1;
return 0;
}
}
/**
*
*/
public void dump()
{
synchronized (_keys)
{
for (Key k: _keys.values())
System.out.printf( "key: %d %s\n", k.avail(), k.selector );
for (Key k: _keys.values())
k.selector.dump();
}
}
/**
* @return the statically configured Selector, or one newly configured if
* none.
*/
public static SuperSelector get()
{
if (selector == null)
{
synchronized (sync)
{
if (selector == null)
{
selector = new SuperSelector();
selector.start();
}
}
}
return selector;
}
/**
* Replaces the current statically configured selector with a new one.
* @param newSelector
* @return the old statically configured selected.
*/
public static SuperSelector put( SuperSelector newSelector )
{
synchronized (sync)
{
SuperSelector oldSelector = selector;
selector = newSelector;
return oldSelector;
}
}
private static SuperSelector selector;
private final static Object sync = new Object();
}