blob: 5be921d16a9b3e203d03da76d926b3f1cb3ba9a9 [file] [log] [blame]
/* $Id$
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.etch.util.core.nio;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.TreeMap;
import org.apache.etch.util.Assertion;
* Models the external api of a Selector while keeping a list of available
* selectors and their load.
public class SuperSelector
* Marks the SuperSelector as open for business.
public void start()
checkStarted( false );
started = true;
private void checkStarted( boolean expected )
if (expected != started)
throw new IllegalStateException( expected ? "not started" : "started" );
private boolean started;
* Shuts down the SuperSelector, closing all the existing selectors and
* handlers.
public void shutdown()
started = false;
* Adds the Selector to the set of available selectors, it is new created.
* @param selector
* @param avail the number of available slots.
public void add( Selector selector, int avail )
checkStarted( true );
Key k = new Key( avail, selector );
if (!sel2keys.containsKey( selector ))
sel2keys.put( selector, k );
keysAdd( k );
private final Map<Selector, Key> sel2keys =
Collections.synchronizedMap( new HashMap<Selector, Key>() );
* Updates the Selector's capacity in the set of available selectors.
* @param selector
* @param avail the number of available slots.
public void update( Selector selector, int avail )
checkStarted( true );
Key k = sel2keys.get( selector );
Assertion.check( k != null, "k != null" );
synchronized (_keys)
if (avail != k.avail())
keysRemove( k );
k.setAvail( avail );
keysAdd( k );
* Removes the Selector from the set of available selectors. It is dead.
* @param selector
public void remove( Selector selector )
Key k = sel2keys.remove( selector );
if (k != null)
keysRemove( k );
private void keysAdd( Key k )
synchronized (_keys)
if (k.avail() > 0)
Key o = _keys.put( k, k );
// System.out.printf( "add %s => %s\n", k, o );
Assertion.check( o == null, "o == null" );
Assertion.check( _keys.get( k ) == null, "_keys.get( k ) == null" );
private void keysRemove( Key k )
synchronized (_keys)
Key o = _keys.remove( k );
// System.out.printf( "remove %s => %s\n", k, o );
Assertion.check( o == k || o == null, "o == k || o == null" );
private Selector getSelector() throws IOException
synchronized (_keys)
Key k = _keys.firstKey();
keysRemove( k );
k.setAvail( k.avail()-1 );
keysAdd( k );
return k.selector;
catch ( NoSuchElementException e )
Selector s = new Selector( _id++, this );
// constructor calls us back at #add(Selector, int)
return s;
private final TreeMap<Key, Key> _keys = new TreeMap<Key, Key>();
private int _id;
* Registers the handler with an available Selector. This may be called by
* an external thread or by one of the Selector threads.
* @param handler
* @throws IOException
public void register( Handler<?> handler ) throws IOException
checkStarted( true );
getSelector().register( handler );
* @param addr
* @param backlog
* @param factory
* @return a new AcceptHandler already registered with this selector.
* @throws Exception
public AcceptHandler newAcceptHandler( InetSocketAddress addr, int backlog,
AcceptHandlerFactory factory ) throws Exception
ServerSocketChannel ssc =;
ssc.configureBlocking( false );
ServerSocket ss = ssc.socket();
ss.bind( addr, backlog );
AcceptHandler ah = factory.newAcceptHandler( ssc );
register( ah );
return ah;
* @param addr
* @param factory
* @return a new StreamHandler already registered with this selector.
* @throws Exception
public StreamHandler newStreamHandler( InetSocketAddress addr,
StreamHandlerFactory factory ) throws Exception
SocketChannel sc =;
sc.configureBlocking( false );
StreamHandler sh = factory.newStreamHandler( sc, true );
register( sh );
sc.connect( addr );
return sh;
* @param sc
* @param factory
* @return a new StreamHandler already registered with this selector.
* @throws Exception
public StreamHandler newStreamHandler( SocketChannel sc,
StreamHandlerFactory factory ) throws Exception
sc.configureBlocking( false );
StreamHandler sh = factory.newStreamHandler( sc, false );
register( sh );
return sh;
private static class Key implements Comparable<Key>
public Key( int avail, Selector selector )
_avail = avail;
this.selector = selector;
private int _avail;
private final Selector selector;
public String toString()
return String.format( "Key(%d, %s)", _avail, selector );
public int avail()
return _avail;
public void setAvail( int avail )
_avail = avail;
public int hashCode()
return _avail ^ selector.hashCode();
public boolean equals( Object obj )
if (obj == this)
return true;
if (obj == null || obj.getClass() != Key.class)
return false;
Key k = (Key) obj;
return _avail == k._avail && selector == k.selector;
public int compareTo( Key k )
if (_avail < k._avail)
return -1;
if (_avail > k._avail)
return 1;
// avail == k.avail
if ( <
return -1;
if ( >
return 1;
return 0;
public void dump()
synchronized (_keys)
for (Key k: _keys.values())
System.out.printf( "key: %d %s\n", k.avail(), k.selector );
for (Key k: _keys.values())
* @return the statically configured Selector, or one newly configured if
* none.
public static SuperSelector get()
if (selector == null)
synchronized (sync)
if (selector == null)
selector = new SuperSelector();
return selector;
* Replaces the current statically configured selector with a new one.
* @param newSelector
* @return the old statically configured selected.
public static SuperSelector put( SuperSelector newSelector )
synchronized (sync)
SuperSelector oldSelector = selector;
selector = newSelector;
return oldSelector;
private static SuperSelector selector;
private final static Object sync = new Object();