blob: c53f010360cdf1e07585575afefbbf5e3096a21f [file] [log] [blame]
/* $Id$
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.etch.util.core.nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.apache.etch.util.Monitor;
import org.apache.etch.util.Resources;
import org.apache.etch.util.Todo;
import org.apache.etch.util.TodoManager;
import org.apache.etch.util.URL;
import org.apache.etch.util.core.io.Session;
import org.apache.etch.util.core.io.SessionListener;
import org.apache.etch.util.core.io.TcpConnection;
import org.apache.etch.util.core.io.Transport;
/**
* Selector based TCP listener.
*
*/
public class Tcp2Listener implements Transport<SessionListener<SocketChannel>>,
AcceptHandlerFactory
{
/**
* @param uri
* @param resources
*/
public Tcp2Listener( String uri, Resources resources )
{
this( new URL( uri ), resources );
}
/**
* @param uri
* @param resources
*/
public Tcp2Listener( URL uri, Resources resources )
{
this( uri.getHost(), uri.getPort(), uri
.getIntegerTerm( "backlog", 0 ), resources );
}
private Tcp2Listener( String host, Integer port, int backlog,
Resources resources )
{
if (port == null)
throw new IllegalArgumentException( "port not specified" );
if (port < 0 || port > 65535)
throw new IllegalArgumentException( "port < 0 || port > 65535" );
if (backlog < 0)
throw new IllegalArgumentException( "backlog < 0" );
selector = (SuperSelector) resources.get( "selector" );
this.host = host;
this.port = port;
this.backlog = backlog;
}
private final SuperSelector selector;
private final String host;
private final int port;
private final int backlog;
/**
* Host name to specify to select listening on all interfaces. The value is
* "0.0.0.0".
*/
public static final String ALL_INTFS = "0.0.0.0";
/**
* @param s
* @return null if s is null or ALL_INTFS ("0.0.0.0").
*/
protected static String translateHost( String s )
{
if (s != null && s.equals( ALL_INTFS ))
return null;
return s;
}
@Override
public String toString()
{
ServerSocket s = null;
if (s != null)
return String.format( "Tcp2Listener(up, %s, %d)", s
.getInetAddress(), s.getLocalPort() );
return String.format( "Tcp2Listener(down, %s, %d)", host, port );
}
// ///////////
// SESSION //
// ///////////
public SessionListener<SocketChannel> getSession()
{
return session;
}
public void setSession( SessionListener<SocketChannel> session )
{
this.session = session;
}
private SessionListener<SocketChannel> session;
// /////////////
// TRANSPORT //
// /////////////
public Object transportQuery( Object query ) throws Exception
{
if (query == TcpConnection.LOCAL_ADDRESS)
return localAddress();
if (query instanceof WaitUp)
{
waitUp( ((WaitUp) query).maxDelay );
return null;
}
if (query instanceof WaitDown)
{
waitDown( ((WaitDown) query).maxDelay );
return null;
}
throw new UnsupportedOperationException( "unknown query: " + query );
}
public void transportControl( Object control, Object value )
throws Exception
{
if (control == START)
{
start();
return;
}
if (control == START_AND_WAIT_UP)
{
start();
waitUp( (Integer) value );
return;
}
if (control == STOP)
{
stop();
return;
}
if (control == STOP_AND_WAIT_DOWN)
{
stop();
waitDown( (Integer) value );
return;
}
throw new UnsupportedOperationException( "unknown control: " + control );
}
public void transportNotify( Object event ) throws Exception
{
// nothing to do.
}
// //////////////////
// IMPLEMENTATION //
// //////////////////
void start() throws Exception
{
synchronized (startedSync)
{
if (started)
throw new IllegalStateException( "started" );
started = true;
restart();
}
}
private void restart() throws Exception
{
// fireUp will save the handler.
try
{
selector.newAcceptHandler(
new InetSocketAddress( host, port ), backlog, this );
}
catch ( Error e )
{
// This is a hack work around for jdk_1.5.0_15, which has no
// translation for the error message and turns it into an
// Error.
String msg = e.getMessage();
if (msg == null || !msg.equals( "Untranslated exception" ))
throw e;
Throwable x = e.getCause();
if (x == null || !(x instanceof SocketException))
throw e;
throw (SocketException) x;
}
}
private boolean started;
private final Object startedSync = new Object();
private MyAcceptHandler handler;
private final Object handlerSync = new Object();
void stop() throws IOException
{
started = false;
MyAcceptHandler h = setHandler( null );
if (h != null)
{
h.cancel();
}
}
private MyAcceptHandler setHandler( MyAcceptHandler newHandler )
{
synchronized (handlerSync)
{
if (newHandler == handler)
return null;
if (newHandler != null && handler != null)
throw new IllegalStateException(
"newHandler != null && handler != null && newHandler != handler" );
MyAcceptHandler oldHandler = handler;
handler = newHandler;
return oldHandler;
}
}
private void fireUp( MyAcceptHandler h )
{
setHandler( h );
status.set( Session.UP );
TodoManager.addTodo( new Todo()
{
public void doit( TodoManager m ) throws Exception
{
session.sessionNotify( Session.UP );
}
public void exception( TodoManager m, Exception e )
{
e.printStackTrace();
}
} );
}
private void fireDown()
{
setHandler( null );
status.set( Session.DOWN );
TodoManager.addTodo( new Todo()
{
public void doit( TodoManager m ) throws Exception
{
session.sessionNotify( Session.DOWN );
}
public void exception( TodoManager m, Exception e )
{
e.printStackTrace();
}
} );
}
void waitUp( int maxDelay ) throws Exception
{
status.waitUntilEq( Session.UP, maxDelay );
}
void waitDown( int maxDelay ) throws Exception
{
status.waitUntilEq( Session.DOWN, maxDelay );
}
private final Monitor<String> status = new Monitor<String>( "status",
Session.DOWN );
SocketAddress localAddress() throws IOException
{
return checkSocket().getLocalSocketAddress();
}
private ServerSocket checkSocket() throws IOException
{
ServerSocket s = checkChannel().socket();
if (s == null)
throw new IOException( "no socket" );
return s;
}
private ServerSocketChannel checkChannel() throws IOException
{
ServerSocketChannel c = checkHandler().channel();
if (c == null)
throw new IOException( "c == null" );
return c;
}
private AcceptHandler checkHandler() throws IOException
{
AcceptHandler h = handler;
if (h == null)
throw new IOException( "h == null" );
return h;
}
public AcceptHandler newAcceptHandler( ServerSocketChannel channel )
throws Exception
{
return new MyAcceptHandler( channel, null );
}
/** */
public class MyAcceptHandler extends AcceptHandler
{
/**
* @param channel
* @param factory
*/
public MyAcceptHandler( ServerSocketChannel channel,
StreamHandlerFactory factory )
{
super( channel, factory );
}
@Override
protected void registered()
{
super.registered();
fireUp( this );
}
@Override
protected void acceptedChannel( SocketChannel sc ) throws Exception
{
session.sessionAccepted( sc );
}
@Override
public void canceled( Exception e )
{
super.canceled( e );
fireDown();
}
}
boolean isStarted()
{
return started;
}
/**
* @return the host interface to listen on.
*/
public String getHost()
{
return host;
}
/**
* @return the port to listen on.
*/
public int getPort()
{
return port;
}
/**
* @return the backlog (the number of unaccepted connections to allow).
*/
public int getBacklog()
{
return backlog;
}
}